From ede78ad231240f8200ccac60a32ecec07045b963 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 4 Aug 2016 12:00:37 +0200 Subject: [PATCH] Use primary terms as authority to fail shards (#19715) A primary shard currently instructs the master to fail a replica shard that it fails to replicate writes to before acknowledging the writes to the client. To ensure that the primary instructing the master to fail the replica is still the current primary in the cluster state on the master, it submits not only the identity of the replica shard to fail to the master but also its own shard identity. This can be problematic however when the primary is relocating. After primary relocation handoff but before the primary relocation target is activated, the primary relocation target is replicating writes through the authority of the primary relocation source. This means that the primary relocation target should probably send the identity of the primary relocation source as authority. However, this is not good enough either, as primary shard activation and shard failure instructions can arrive out-of-order. This means that the relocation target would have to send both relocation source and target identity as authority. Fortunately, there is another concept in the cluster state that represents this joint authority, namely primary terms. The primary term is only increased on initial assignment or when a replica is promoted. It stays the same however when a primary relocates. This commit changes ShardStateAction to rely on primary terms for shard authority. It also changes the wire format to only transmit ShardId and allocation id of the shard to fail (instead of the full ShardRouting), so that the same action can be used in a subsequent PR to remove allocation ids from the active allocation set for which there exist no ShardRouting in the cluster anymore. Last but not least, this commit also makes AllocationService less lenient, requiring ShardRouting instances that are passed to its applyStartedShards and applyFailedShards methods to exist in the routing table. ShardStateAction, which is calling these methods, now has the responsibility to resolve the ShardRouting objects that are to be started / failed, and remove duplicates. --- .../replication/ReplicationOperation.java | 8 +- .../TransportReplicationAction.java | 6 +- .../action/shard/ShardStateAction.java | 294 ++++++++++-------- .../routing/IndexShardRoutingTable.java | 2 +- .../cluster/routing/RoutingNodes.java | 18 +- .../cluster/routing/RoutingTable.java | 25 +- .../cluster/routing/ShardRouting.java | 23 +- .../routing/allocation/AllocationService.java | 205 ++++++------ .../allocation/FailedRerouteAllocation.java | 14 +- .../routing/allocation/RoutingAllocation.java | 3 +- .../allocation/StartedRerouteAllocation.java | 7 +- .../decider/ThrottlingAllocationDecider.java | 4 +- .../gateway/GatewayAllocator.java | 4 +- .../cluster/IndicesClusterStateService.java | 4 +- .../ReplicationOperationTests.java | 10 +- .../TransportReplicationActionTests.java | 9 +- ...rdFailedClusterStateTaskExecutorTests.java | 138 ++++---- .../action/shard/ShardStateActionTests.java | 33 +- .../cluster/routing/AllocationIdTests.java | 3 +- .../cluster/routing/ShardRoutingTests.java | 3 +- .../allocation/FailedShardsRoutingTests.java | 43 ++- .../allocation/StartedShardsRoutingTests.java | 51 +-- .../DiskThresholdDeciderUnitTests.java | 4 +- .../DiscoveryWithServiceDisruptionsIT.java | 2 +- .../zen/NodeJoinControllerTests.java | 2 +- .../ESIndexLevelReplicationTestCase.java | 2 +- .../indices/cluster/ClusterStateChanges.java | 37 ++- 27 files changed, 476 insertions(+), 478 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index af6d8b030ca..070840ca2ef 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -136,7 +136,7 @@ public class ReplicationOperation< } if (shard.relocating() && shard.relocatingNodeId().equals(localNodeId) == false) { - performOnReplica(shard.buildTargetRelocatingShard(), replicaRequest); + performOnReplica(shard.getTargetRelocatingShard(), replicaRequest); } } } @@ -167,7 +167,7 @@ public class ReplicationOperation< shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); logger.warn("[{}] {}", replicaException, shard.shardId(), message); - replicasProxy.failShard(shard, primary.routingEntry(), message, replicaException, + replicasProxy.failShard(shard, replicaRequest.primaryTerm(), message, replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded, ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded() @@ -327,7 +327,7 @@ public class ReplicationOperation< /** * Fail the specified shard, removing it from the current set of active shards * @param replica shard to fail - * @param primary the primary shard that requested the failure + * @param primaryTerm the primary term of the primary shard when requesting the failure * @param message a (short) description of the reason * @param exception the original exception which caused the ReplicationOperation to request the shard to be failed * @param onSuccess a callback to call when the shard has been successfully removed from the active set. @@ -335,7 +335,7 @@ public class ReplicationOperation< * by the master. * @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the */ - void failShard(ShardRouting replica, ShardRouting primary, String message, Exception exception, Runnable onSuccess, + void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure); } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index a825cd1b9b5..8294ccfe0d6 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -866,10 +866,10 @@ public abstract class TransportReplicationAction< } @Override - public void failShard(ShardRouting replica, ShardRouting primary, String message, Exception exception, + public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess, Consumer onFailure, Consumer onIgnoredFailure) { - shardStateAction.shardFailed( - replica, primary, message, exception, + shardStateAction.remoteShardFailed( + replica, primaryTerm, message, exception, new ShardStateAction.Listener() { @Override public void onSuccess() { diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index e6a6dea7def..99b19e31a4e 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -29,9 +29,8 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.MasterNodeChangePredicate; import org.elasticsearch.cluster.NotMasterException; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -64,11 +63,10 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Locale; -import java.util.Map; -import java.util.stream.Collectors; +import java.util.Set; public class ShardStateAction extends AbstractComponent { @@ -87,19 +85,19 @@ public class ShardStateAction extends AbstractComponent { this.clusterService = clusterService; this.threadPool = threadPool; - transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger)); - transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger)); + transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger)); + transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger)); } - private void sendShardAction(final String actionName, final ClusterStateObserver observer, final ShardRoutingEntry shardRoutingEntry, final Listener listener) { + private void sendShardAction(final String actionName, final ClusterStateObserver observer, final ShardEntry shardEntry, final Listener listener) { DiscoveryNode masterNode = observer.observedState().nodes().getMasterNode(); if (masterNode == null) { - logger.warn("{} no master known for action [{}] for shard [{}]", shardRoutingEntry.getShardRouting().shardId(), actionName, shardRoutingEntry.getShardRouting()); - waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener); + logger.warn("{} no master known for action [{}] for shard entry [{}]", shardEntry.shardId, actionName, shardEntry); + waitForNewMasterAndRetry(actionName, observer, shardEntry, listener); } else { - logger.debug("{} sending [{}] to [{}] for shard [{}]", shardRoutingEntry.getShardRouting().shardId(), actionName, masterNode.getId(), shardRoutingEntry); + logger.debug("{} sending [{}] to [{}] for shard entry [{}]", shardEntry.shardId, actionName, masterNode.getId(), shardEntry); transportService.sendRequest(masterNode, - actionName, shardRoutingEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { + actionName, shardEntry, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleResponse(TransportResponse.Empty response) { listener.onSuccess(); @@ -108,9 +106,9 @@ public class ShardStateAction extends AbstractComponent { @Override public void handleException(TransportException exp) { if (isMasterChannelException(exp)) { - waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener); + waitForNewMasterAndRetry(actionName, observer, shardEntry, listener); } else { - logger.warn("{} unexpected failure while sending request [{}] to [{}] for shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), actionName, masterNode, shardRoutingEntry); + logger.warn("{} unexpected failure while sending request [{}] to [{}] for shard entry [{}]", exp, shardEntry.shardId, actionName, masterNode, shardEntry); listener.onFailure(exp instanceof RemoteTransportException ? (Exception) (exp.getCause() instanceof Exception ? exp.getCause() : new ElasticsearchException(exp.getCause())) : exp); } } @@ -129,34 +127,46 @@ public class ShardStateAction extends AbstractComponent { } /** - * Send a shard failed request to the master node to update the - * cluster state. - * @param shardRouting the shard to fail - * @param sourceShardRouting the source shard requesting the failure (must be the shard itself, or the primary shard) + * Send a shard failed request to the master node to update the cluster state with the failure of a shard on another node. + * + * @param shardRouting the shard to fail + * @param primaryTerm the primary term associated with the primary shard that is failing the shard. * @param message the reason for the failure * @param failure the underlying cause of the failure * @param listener callback upon completion of the request */ - public void shardFailed(final ShardRouting shardRouting, ShardRouting sourceShardRouting, final String message, @Nullable final Exception failure, Listener listener) { + public void remoteShardFailed(final ShardRouting shardRouting, long primaryTerm, final String message, @Nullable final Exception failure, Listener listener) { + assert primaryTerm > 0L : "primary term should be strictly positive"; + shardFailed(shardRouting, primaryTerm, message, failure, listener); + } + + /** + * Send a shard failed request to the master node to update the cluster state when a shard on the local node failed. + */ + public void localShardFailed(final ShardRouting shardRouting, final String message, @Nullable final Exception failure, Listener listener) { + shardFailed(shardRouting, 0L, message, failure, listener); + } + + private void shardFailed(final ShardRouting shardRouting, long primaryTerm, final String message, @Nullable final Exception failure, Listener listener) { ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); - ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, sourceShardRouting, message, failure); - sendShardAction(SHARD_FAILED_ACTION_NAME, observer, shardRoutingEntry, listener); + ShardEntry shardEntry = new ShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message, failure); + sendShardAction(SHARD_FAILED_ACTION_NAME, observer, shardEntry, listener); } // visible for testing - protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) { + protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardEntry shardEntry, Listener listener) { observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { if (logger.isTraceEnabled()) { - logger.trace("new cluster state [{}] after waiting for master election to fail shard [{}]", state.prettyPrint(), shardRoutingEntry); + logger.trace("new cluster state [{}] after waiting for master election to fail shard entry [{}]", state.prettyPrint(), shardEntry); } - sendShardAction(actionName, observer, shardRoutingEntry, listener); + sendShardAction(actionName, observer, shardEntry, listener); } @Override public void onClusterServiceClose() { - logger.warn("{} node closed while execution action [{}] for shard [{}]", shardRoutingEntry.failure, shardRoutingEntry.getShardRouting().shardId(), actionName, shardRoutingEntry.getShardRouting()); + logger.warn("{} node closed while execution action [{}] for shard entry [{}]", shardEntry.failure, shardEntry.shardId, actionName, shardEntry); listener.onFailure(new NodeClosedException(clusterService.localNode())); } @@ -168,7 +178,7 @@ public class ShardStateAction extends AbstractComponent { }, MasterNodeChangePredicate.INSTANCE); } - private static class ShardFailedTransportHandler implements TransportRequestHandler { + private static class ShardFailedTransportHandler implements TransportRequestHandler { private final ClusterService clusterService; private final ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor; private final ESLogger logger; @@ -180,8 +190,8 @@ public class ShardStateAction extends AbstractComponent { } @Override - public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { - logger.warn("{} received shard failed for {}", request.failure, request.shardRouting.shardId(), request); + public void messageReceived(ShardEntry request, TransportChannel channel) throws Exception { + logger.warn("{} received shard failed for {}", request.failure, request.shardId, request); clusterService.submitStateUpdateTask( "shard-failed", request, @@ -190,22 +200,22 @@ public class ShardStateAction extends AbstractComponent { new ClusterStateTaskListener() { @Override public void onFailure(String source, Exception e) { - logger.error("{} unexpected failure while failing shard [{}]", e, request.shardRouting.shardId(), request.shardRouting); + logger.error("{} unexpected failure while failing shard [{}]", e, request.shardId, request); try { channel.sendResponse(e); } catch (Exception channelException) { channelException.addSuppressed(e); - logger.warn("{} failed to send failure [{}] while failing shard [{}]", channelException, request.shardRouting.shardId(), e, request.shardRouting); + logger.warn("{} failed to send failure [{}] while failing shard [{}]", channelException, request.shardId, e, request); } } @Override public void onNoLongerMaster(String source) { - logger.error("{} no longer master while failing shard [{}]", request.shardRouting.shardId(), request.shardRouting); + logger.error("{} no longer master while failing shard [{}]", request.shardId, request); try { channel.sendResponse(new NotMasterException(source)); } catch (Exception channelException) { - logger.warn("{} failed to send no longer master while failing shard [{}]", channelException, request.shardRouting.shardId(), request.shardRouting); + logger.warn("{} failed to send no longer master while failing shard [{}]", channelException, request.shardId, request); } } @@ -214,7 +224,7 @@ public class ShardStateAction extends AbstractComponent { try { channel.sendResponse(TransportResponse.Empty.INSTANCE); } catch (Exception channelException) { - logger.warn("{} failed to send response while failing shard [{}]", channelException, request.shardRouting.shardId(), request.shardRouting); + logger.warn("{} failed to send response while failing shard [{}]", channelException, request.shardId, request); } } } @@ -222,63 +232,81 @@ public class ShardStateAction extends AbstractComponent { } } - static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor { + public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor { private final AllocationService allocationService; private final RoutingService routingService; private final ESLogger logger; - ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, ESLogger logger) { + public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, ESLogger logger) { this.allocationService = allocationService; this.routingService = routingService; this.logger = logger; } @Override - public String describeTasks(List tasks) { - return tasks.stream().map(entry -> entry.getShardRouting().toString()).reduce((s1, s2) -> s1 + ", " + s2).orElse(""); - } + public BatchResult execute(ClusterState currentState, List tasks) throws Exception { + BatchResult.Builder batchResultBuilder = BatchResult.builder(); + List tasksToBeApplied = new ArrayList<>(); + List shardRoutingsToBeApplied = new ArrayList<>(); + Set seenShardRoutings = new HashSet<>(); // to prevent duplicates - @Override - public BatchResult execute(ClusterState currentState, List tasks) throws Exception { - BatchResult.Builder batchResultBuilder = BatchResult.builder(); + for (ShardEntry task : tasks) { + IndexMetaData indexMetaData = currentState.metaData().index(task.shardId.getIndex()); + if (indexMetaData == null) { + // tasks that correspond to non-existent shards are marked as successful + logger.debug("{} ignoring shard failed task [{}] (unknown index {})", task.shardId, task, task.shardId.getIndex()); + batchResultBuilder.success(task); + } else { + // non-local requests + if (task.primaryTerm > 0) { + long currentPrimaryTerm = indexMetaData.primaryTerm(task.shardId.id()); + if (currentPrimaryTerm != task.primaryTerm) { + assert currentPrimaryTerm > task.primaryTerm : "received a primary term with a higher term than in the " + + "current cluster state (received [" + task.primaryTerm + "] but current is [" + currentPrimaryTerm + "])"; + logger.debug("{} failing shard failed task [{}] (primary term {} does not match current term {})", task.shardId, + task, task.primaryTerm, indexMetaData.primaryTerm(task.shardId.id())); + batchResultBuilder.failure(task, new NoLongerPrimaryShardException( + task.shardId, + "primary term [" + task.primaryTerm + "] did not match current primary term [" + currentPrimaryTerm + "]")); + continue; + } + } - // partition tasks into those that correspond to shards - // that exist versus do not exist - Map> partition = - tasks.stream().collect(Collectors.groupingBy(task -> validateTask(currentState, task))); - - // tasks that correspond to non-existent shards are marked - // as successful - batchResultBuilder.successes(partition.getOrDefault(ValidationResult.SHARD_MISSING, Collections.emptyList())); + ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId); + if (matched == null) { + // tasks that correspond to non-existent shards are marked as successful + logger.debug("{} ignoring shard failed task [{}] (shard does not exist anymore)", task.shardId, task); + batchResultBuilder.success(task); + } else { + // remove duplicate actions as allocation service expects a clean list without duplicates + if (seenShardRoutings.contains(matched)) { + logger.trace("{} ignoring shard failed task [{}] (already scheduled to fail {})", task.shardId, task, matched); + tasksToBeApplied.add(task); + } else { + logger.debug("{} failing shard {} (shard failed task: [{}])", task.shardId, matched, task); + tasksToBeApplied.add(task); + shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(matched, task.message, task.failure)); + seenShardRoutings.add(matched); + } + } + } + } + assert tasksToBeApplied.size() >= shardRoutingsToBeApplied.size(); ClusterState maybeUpdatedState = currentState; - List tasksToFail = partition.getOrDefault(ValidationResult.VALID, Collections.emptyList()); try { - List failedShards = - tasksToFail - .stream() - .map(task -> new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure)) - .collect(Collectors.toList()); - RoutingAllocation.Result result = applyFailedShards(currentState, failedShards); + RoutingAllocation.Result result = applyFailedShards(currentState, shardRoutingsToBeApplied); if (result.changed()) { maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build(); } - batchResultBuilder.successes(tasksToFail); + batchResultBuilder.successes(tasksToBeApplied); } catch (Exception e) { + logger.warn("failed to apply failed shards {}", e, shardRoutingsToBeApplied); // failures are communicated back to the requester // cluster state will not be updated in this case - batchResultBuilder.failures(tasksToFail, e); + batchResultBuilder.failures(tasksToBeApplied, e); } - partition - .getOrDefault(ValidationResult.SOURCE_INVALID, Collections.emptyList()) - .forEach(task -> batchResultBuilder.failure( - task, - new NoLongerPrimaryShardException( - task.getShardRouting().shardId(), - "source shard [" + task.sourceShardRouting + "] is neither the local allocation nor the primary allocation") - )); - return batchResultBuilder.build(maybeUpdatedState); } @@ -287,36 +315,6 @@ public class ShardStateAction extends AbstractComponent { return allocationService.applyFailedShards(currentState, failedShards); } - private enum ValidationResult { - VALID, - SOURCE_INVALID, - SHARD_MISSING - } - - private ValidationResult validateTask(ClusterState currentState, ShardRoutingEntry task) { - - // non-local requests - if (!task.shardRouting.isSameAllocation(task.sourceShardRouting)) { - IndexShardRoutingTable indexShard = currentState.getRoutingTable().shardRoutingTableOrNull(task.shardRouting.shardId()); - if (indexShard == null) { - return ValidationResult.SOURCE_INVALID; - } - ShardRouting primaryShard = indexShard.primaryShard(); - if (primaryShard == null || !primaryShard.isSameAllocation(task.sourceShardRouting)) { - return ValidationResult.SOURCE_INVALID; - } - } - - RoutingNode routingNode = currentState.getRoutingNodes().node(task.getShardRouting().currentNodeId()); - if (routingNode != null) { - ShardRouting maybe = routingNode.getByShardId(task.getShardRouting().shardId()); - if (maybe != null && maybe.isSameAllocation(task.getShardRouting())) { - return ValidationResult.VALID; - } - } - return ValidationResult.SHARD_MISSING; - } - @Override public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { int numberOfUnassignedShards = clusterChangedEvent.state().getRoutingNodes().unassigned().size(); @@ -332,11 +330,11 @@ public class ShardStateAction extends AbstractComponent { public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) { ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); - ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, shardRouting, message, null); - sendShardAction(SHARD_STARTED_ACTION_NAME, observer, shardRoutingEntry, listener); + ShardEntry shardEntry = new ShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, null); + sendShardAction(SHARD_STARTED_ACTION_NAME, observer, shardEntry, listener); } - private static class ShardStartedTransportHandler implements TransportRequestHandler { + private static class ShardStartedTransportHandler implements TransportRequestHandler { private final ClusterService clusterService; private final ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor; private final ESLogger logger; @@ -348,8 +346,8 @@ public class ShardStateAction extends AbstractComponent { } @Override - public void messageReceived(ShardRoutingEntry request, TransportChannel channel) throws Exception { - logger.debug("{} received shard started for [{}]", request.shardRouting.shardId(), request); + public void messageReceived(ShardEntry request, TransportChannel channel) throws Exception { + logger.debug("{} received shard started for [{}]", request.shardId, request); clusterService.submitStateUpdateTask( "shard-started", request, @@ -360,7 +358,7 @@ public class ShardStateAction extends AbstractComponent { } } - private static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor, ClusterStateTaskListener { + public static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor, ClusterStateTaskListener { private final AllocationService allocationService; private final ESLogger logger; @@ -370,17 +368,45 @@ public class ShardStateAction extends AbstractComponent { } @Override - public String describeTasks(List tasks) { - return tasks.stream().map(entry -> entry.getShardRouting().toString()).reduce((s1, s2) -> s1 + ", " + s2).orElse(""); - } - - @Override - public BatchResult execute(ClusterState currentState, List tasks) throws Exception { - BatchResult.Builder builder = BatchResult.builder(); + public BatchResult execute(ClusterState currentState, List tasks) throws Exception { + BatchResult.Builder builder = BatchResult.builder(); + List tasksToBeApplied = new ArrayList<>(); List shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); - for (ShardRoutingEntry task : tasks) { - shardRoutingsToBeApplied.add(task.shardRouting); + Set seenShardRoutings = new HashSet<>(); // to prevent duplicates + for (ShardEntry task : tasks) { + assert task.primaryTerm == 0L : "shard is only started by itself: " + task; + + ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId); + if (matched == null) { + // tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started + // events on every cluster state publishing that does not contain the shard as started yet. This means that old stale + // requests might still be in flight even after the shard has already been started or failed on the master. We just + // ignore these requests for now. + logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", task.shardId, task); + builder.success(task); + } else { + if (matched.initializing() == false) { + assert matched.active() : "expected active shard routing for task " + task + " but found " + matched; + // same as above, this might have been a stale in-flight request, so we just ignore. + logger.debug("{} ignoring shard started task [{}] (shard exists but is not initializing: {})", task.shardId, task, + matched); + builder.success(task); + } else { + // remove duplicate actions as allocation service expects a clean list without duplicates + if (seenShardRoutings.contains(matched)) { + logger.trace("{} ignoring shard started task [{}] (already scheduled to start {})", task.shardId, task, matched); + tasksToBeApplied.add(task); + } else { + logger.debug("{} starting shard {} (shard started task: [{}])", task.shardId, matched, task); + tasksToBeApplied.add(task); + shardRoutingsToBeApplied.add(matched); + seenShardRoutings.add(matched); + } + } + } } + assert tasksToBeApplied.size() >= shardRoutingsToBeApplied.size(); + ClusterState maybeUpdatedState = currentState; try { RoutingAllocation.Result result = @@ -388,9 +414,10 @@ public class ShardStateAction extends AbstractComponent { if (result.changed()) { maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build(); } - builder.successes(tasks); + builder.successes(tasksToBeApplied); } catch (Exception e) { - builder.failures(tasks, e); + logger.warn("failed to apply started shards {}", e, shardRoutingsToBeApplied); + builder.failures(tasksToBeApplied, e); } return builder.build(maybeUpdatedState); @@ -402,31 +429,38 @@ public class ShardStateAction extends AbstractComponent { } } - public static class ShardRoutingEntry extends TransportRequest { - ShardRouting shardRouting; - ShardRouting sourceShardRouting; + public static class ShardEntry extends TransportRequest { + ShardId shardId; + String allocationId; + long primaryTerm; String message; Exception failure; - public ShardRoutingEntry() { + public ShardEntry() { } - ShardRoutingEntry(ShardRouting shardRouting, ShardRouting sourceShardRouting, String message, @Nullable Exception failure) { - this.shardRouting = shardRouting; - this.sourceShardRouting = sourceShardRouting; + public ShardEntry(ShardId shardId, String allocationId, long primaryTerm, String message, @Nullable Exception failure) { + this.shardId = shardId; + this.allocationId = allocationId; + this.primaryTerm = primaryTerm; this.message = message; this.failure = failure; } - public ShardRouting getShardRouting() { - return shardRouting; + public ShardId getShardId() { + return shardId; + } + + public String getAllocationId() { + return allocationId; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - shardRouting = new ShardRouting(in); - sourceShardRouting = new ShardRouting(in); + shardId = ShardId.readShardId(in); + allocationId = in.readString(); + primaryTerm = in.readVLong(); message = in.readString(); failure = in.readException(); } @@ -434,8 +468,9 @@ public class ShardStateAction extends AbstractComponent { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - shardRouting.writeTo(out); - sourceShardRouting.writeTo(out); + shardId.writeTo(out); + out.writeString(allocationId); + out.writeVLong(primaryTerm); out.writeString(message); out.writeException(failure); } @@ -443,8 +478,9 @@ public class ShardStateAction extends AbstractComponent { @Override public String toString() { List components = new ArrayList<>(4); - components.add("target shard [" + shardRouting + "]"); - components.add("source shard [" + sourceShardRouting + "]"); + components.add("shard id [" + shardId + "]"); + components.add("allocation id [" + allocationId + "]"); + components.add("primary term [" + primaryTerm + "]"); components.add("message [" + message + "]"); if (failure != null) { components.add("failure [" + ExceptionsHelper.detailedMessage(failure) + "]"); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index cddf6f98a54..619959923e9 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -98,7 +98,7 @@ public class IndexShardRoutingTable implements Iterable { } if (shard.relocating()) { // create the target initializing shard routing on the node the shard is relocating to - allInitializingShards.add(shard.buildTargetRelocatingShard()); + allInitializingShards.add(shard.getTargetRelocatingShard()); } if (shard.assignedToNode()) { assignedShards.add(shard); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 54f7cf2bb76..f453f3c35ca 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -108,7 +108,7 @@ public class RoutingNodes implements Iterable { k -> new LinkedHashMap<>()); // LinkedHashMap to preserve order // add the counterpart shard with relocatingNodeId reflecting the source from which // it's relocating from. - ShardRouting targetShardRouting = shard.buildTargetRelocatingShard(); + ShardRouting targetShardRouting = shard.getTargetRelocatingShard(); addInitialRecovery(targetShardRouting, indexShard.primary); previousValue = entries.put(targetShardRouting.shardId(), targetShardRouting); if (previousValue != null) { @@ -276,6 +276,20 @@ public class RoutingNodes implements Iterable { return replicaSet == null ? EMPTY : Collections.unmodifiableList(replicaSet); } + @Nullable + public ShardRouting getByAllocationId(ShardId shardId, String allocationId) { + final List replicaSet = assignedShards.get(shardId); + if (replicaSet == null) { + return null; + } + for (ShardRouting shardRouting : replicaSet) { + if (shardRouting.allocationId().getId().equals(allocationId)) { + return shardRouting; + } + } + return null; + } + /** * Returns the active primary shard for the given shard id or null if * no primary is found or the primary is not active. @@ -406,7 +420,7 @@ public class RoutingNodes implements Iterable { ensureMutable(); relocatingShards++; ShardRouting source = shard.relocate(nodeId, expectedShardSize); - ShardRouting target = source.buildTargetRelocatingShard(); + ShardRouting target = source.getTargetRelocatingShard(); updateAssigned(shard, source); node(target.currentNodeId()).add(target); assignedShardsAdd(target); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index f43517ec559..6b7651b5bfc 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.Diffable; import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -145,6 +146,26 @@ public class RoutingTable implements Iterable, Diffable, DiffableemptyList())); @@ -278,7 +299,7 @@ public class RoutingTable implements Iterable, Diffable asList; private final long expectedShardSize; + @Nullable + private final ShardRouting targetRelocatingShard; /** * A constructor to internally create shard routing instances, note, the internal flag should only be set to true @@ -74,11 +76,22 @@ public final class ShardRouting implements Writeable, ToXContent { this.unassignedInfo = unassignedInfo; this.allocationId = allocationId; this.expectedShardSize = expectedShardSize; + this.targetRelocatingShard = initializeTargetRelocatingShard(); assert expectedShardSize == UNAVAILABLE_EXPECTED_SHARD_SIZE || state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state; assert expectedShardSize >= 0 || state != ShardRoutingState.INITIALIZING || state != ShardRoutingState.RELOCATING : expectedShardSize + " state: " + state; assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta"; } + @Nullable + private ShardRouting initializeTargetRelocatingShard() { + if (state == ShardRoutingState.RELOCATING) { + return new ShardRouting(shardId, relocatingNodeId, currentNodeId, restoreSource, primary, + ShardRoutingState.INITIALIZING, unassignedInfo, AllocationId.newTargetRelocation(allocationId), expectedShardSize); + } else { + return null; + } + } + /** * Creates a new unassigned shard. */ @@ -177,14 +190,13 @@ public final class ShardRouting implements Writeable, ToXContent { } /** - * Creates a shard routing representing the target shard. + * Returns a shard routing representing the target shard. * The target shard routing will be the INITIALIZING state and have relocatingNodeId set to the * source node. */ - public ShardRouting buildTargetRelocatingShard() { + public ShardRouting getTargetRelocatingShard() { assert relocating(); - return new ShardRouting(shardId, relocatingNodeId, currentNodeId, restoreSource, primary, ShardRoutingState.INITIALIZING, unassignedInfo, - AllocationId.newTargetRelocation(allocationId), expectedShardSize); + return targetRelocatingShard; } /** @@ -282,6 +294,7 @@ public final class ShardRouting implements Writeable, ToXContent { } expectedShardSize = shardSize; asList = Collections.singletonList(this); + targetRelocatingShard = initializeTargetRelocatingShard(); } public ShardRouting(StreamInput in) throws IOException { @@ -453,7 +466,7 @@ public final class ShardRouting implements Writeable, ToXContent { } /** - * Returns true if this shard is a relocation target for another shard (i.e., was created with {@link #buildTargetRelocatingShard()} + * Returns true if this shard is a relocation target for another shard (i.e., was created with {@link #initializeTargetRelocatingShard()} */ public boolean isRelocationTarget() { return state == ShardRoutingState.INITIALIZING && relocatingNodeId != null; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index f984e8b4f1e..f58bc22c63f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation.Result; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -84,23 +85,25 @@ public class AllocationService extends AbstractComponent { } /** - * Applies the started shards. Note, shards can be called several times within this method. + * Applies the started shards. Note, only initializing ShardRouting instances that exist in the routing table should be + * provided as parameter and no duplicates should be contained. *

* If the same instance of the routing table is returned, then no change has been made.

*/ - public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List startedShards) { + public Result applyStartedShards(ClusterState clusterState, List startedShards) { return applyStartedShards(clusterState, startedShards, true); } - public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List startedShards, boolean withReroute) { + public Result applyStartedShards(ClusterState clusterState, List startedShards, boolean withReroute) { + if (startedShards.isEmpty()) { + return new Result(false, clusterState.routingTable(), clusterState.metaData()); + } RoutingNodes routingNodes = getMutableRoutingNodes(clusterState); // shuffle the unassigned nodes, just so we won't have things like poison failed shards routingNodes.unassigned().shuffle(); - StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState, startedShards, clusterInfoService.getClusterInfo(), currentNanoTime()); - boolean changed = applyStartedShards(allocation, startedShards); - if (!changed) { - return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); - } + StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState, startedShards, + clusterInfoService.getClusterInfo(), currentNanoTime()); + applyStartedShards(allocation, startedShards); gatewayAllocator.applyStartedShards(allocation); if (withReroute) { reroute(allocation); @@ -109,12 +112,12 @@ public class AllocationService extends AbstractComponent { return buildResultAndLogHealthChange(allocation, "shards started [" + startedShardsAsString + "] ..."); } - protected RoutingAllocation.Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason) { + protected Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason) { return buildResultAndLogHealthChange(allocation, reason, new RoutingExplanations()); } - protected RoutingAllocation.Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason, RoutingExplanations explanations) { + protected Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason, RoutingExplanations explanations) { MetaData oldMetaData = allocation.metaData(); RoutingTable oldRoutingTable = allocation.routingTable(); RoutingNodes newRoutingNodes = allocation.routingNodes(); @@ -128,7 +131,7 @@ public class AllocationService extends AbstractComponent { metaData(newMetaData).routingTable(newRoutingTable).build()), reason ); - return new RoutingAllocation.Result(true, newRoutingTable, newMetaData, explanations); + return new Result(true, newRoutingTable, newMetaData, explanations); } /** @@ -186,7 +189,7 @@ public class AllocationService extends AbstractComponent { // we do not use newPrimary.isTargetRelocationOf(oldPrimary) because that one enforces newPrimary to // be initializing. However, when the target shard is activated, we still want the primary term to staty // the same - (oldPrimary.relocating() && newPrimary.isSameAllocation(oldPrimary.buildTargetRelocatingShard()))) { + (oldPrimary.relocating() && newPrimary.isSameAllocation(oldPrimary.getTargetRelocatingShard()))) { // do nothing } else { // incrementing the primary term @@ -210,37 +213,44 @@ public class AllocationService extends AbstractComponent { } } - public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) { + public Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) { return applyFailedShards(clusterState, Collections.singletonList(new FailedRerouteAllocation.FailedShard(failedShard, null, null))); } /** - * Applies the failed shards. Note, shards can be called several times within this method. + * Applies the failed shards. Note, only assigned ShardRouting instances that exist in the routing table should be + * provided as parameter and no duplicates should be contained. + * *

* If the same instance of the routing table is returned, then no change has been made.

*/ - public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List failedShards) { + public Result applyFailedShards(ClusterState clusterState, List failedShards) { + if (failedShards.isEmpty()) { + return new Result(false, clusterState.routingTable(), clusterState.metaData()); + } RoutingNodes routingNodes = getMutableRoutingNodes(clusterState); // shuffle the unassigned nodes, just so we won't have things like poison failed shards routingNodes.unassigned().shuffle(); long currentNanoTime = currentNanoTime(); - FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState, failedShards, clusterInfoService.getClusterInfo(), currentNanoTime); - boolean changed = false; - // as failing primaries also fail associated replicas, we fail replicas first here so that their nodes are added to ignore list + FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState, failedShards, + clusterInfoService.getClusterInfo(), currentNanoTime); + + // as failing primaries also fail associated replicas, we fail replicas first here to avoid re-resolving replica ShardRouting List orderedFailedShards = new ArrayList<>(failedShards); - orderedFailedShards.sort(Comparator.comparing(failedShard -> failedShard.shard.primary())); - for (FailedRerouteAllocation.FailedShard failedShard : orderedFailedShards) { - UnassignedInfo unassignedInfo = failedShard.shard.unassignedInfo(); - final int failedAllocations = unassignedInfo != null ? unassignedInfo.getNumFailedAllocations() : 0; - changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure, - failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT)); - } - if (!changed) { - return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); + orderedFailedShards.sort(Comparator.comparing(failedShard -> failedShard.routingEntry.primary())); + + for (FailedRerouteAllocation.FailedShard failedShardEntry : orderedFailedShards) { + ShardRouting failedShard = failedShardEntry.routingEntry; + final int failedAllocations = failedShard.unassignedInfo() != null ? failedShard.unassignedInfo().getNumFailedAllocations() : 0; + UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShardEntry.message, + failedShardEntry.failure, failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false, + AllocationStatus.NO_ATTEMPT); + allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); + applyFailedShard(allocation, failedShard, unassignedInfo); } gatewayAllocator.applyFailedShards(allocation); reroute(allocation); - String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString()); + String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.routingEntry.shardId().toString()); return buildResultAndLogHealthChange(allocation, "shards failed [" + failedShardsAsString + "] ..."); } @@ -259,9 +269,9 @@ public class AllocationService extends AbstractComponent { metaData.getIndexSafe(shardRouting.index()).getSettings()); if (newComputedLeftDelayNanos == 0) { changed = true; - unassignedIterator.updateUnassignedInfo(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), - unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), false, - unassignedInfo.getLastAllocationStatus())); + unassignedIterator.updateUnassignedInfo(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), + unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), + unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus())); } } } @@ -285,7 +295,7 @@ public class AllocationService extends AbstractComponent { .collect(Collectors.joining(", ")); } - public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands, boolean explain, boolean retryFailed) { + public Result reroute(ClusterState clusterState, AllocationCommands commands, boolean explain, boolean retryFailed) { RoutingNodes routingNodes = getMutableRoutingNodes(clusterState); // we don't shuffle the unassigned shards here, to try and get as close as possible to // a consistent result of the effect the commands have on the routing @@ -311,7 +321,7 @@ public class AllocationService extends AbstractComponent { *

* If the same instance of the routing table is returned, then no change has been made. */ - public RoutingAllocation.Result reroute(ClusterState clusterState, String reason) { + public Result reroute(ClusterState clusterState, String reason) { return reroute(clusterState, reason, false); } @@ -320,7 +330,7 @@ public class AllocationService extends AbstractComponent { *

* If the same instance of the routing table is returned, then no change has been made. */ - protected RoutingAllocation.Result reroute(ClusterState clusterState, String reason, boolean debug) { + protected Result reroute(ClusterState clusterState, String reason, boolean debug) { RoutingNodes routingNodes = getMutableRoutingNodes(clusterState); // shuffle the unassigned nodes, just so we won't have things like poison failed shards routingNodes.unassigned().shuffle(); @@ -328,7 +338,7 @@ public class AllocationService extends AbstractComponent { clusterInfoService.getClusterInfo(), currentNanoTime(), false); allocation.debugDecision(debug); if (!reroute(allocation)) { - return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); + return new Result(false, clusterState.routingTable(), clusterState.metaData()); } return buildResultAndLogHealthChange(allocation, reason); } @@ -420,7 +430,7 @@ public class AllocationService extends AbstractComponent { boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0; UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT); - applyFailedShard(allocation, shardRouting, false, unassignedInfo); + applyFailedShard(allocation, shardRouting, unassignedInfo); } // its a dead node, remove it, note, its important to remove it *after* we apply failed shard // since it relies on the fact that the RoutingNode exists in the list of nodes @@ -429,111 +439,70 @@ public class AllocationService extends AbstractComponent { return changed; } - private boolean failReplicasForUnassignedPrimary(RoutingAllocation allocation, ShardRouting primary) { + private boolean failReplicasForUnassignedPrimary(RoutingAllocation allocation, ShardRouting failedPrimary) { + assert failedPrimary.primary() : "can only fail replicas for primary shard: " + failedPrimary; List replicas = new ArrayList<>(); - for (ShardRouting routing : allocation.routingNodes().assignedShards(primary.shardId())) { + for (ShardRouting routing : allocation.routingNodes().assignedShards(failedPrimary.shardId())) { if (!routing.primary() && routing.initializing()) { replicas.add(routing); } } - boolean changed = false; - for (ShardRouting routing : replicas) { - changed |= applyFailedShard(allocation, routing, false, - new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, "primary failed while replica initializing", - null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, - AllocationStatus.NO_ATTEMPT)); + for (ShardRouting failedReplica : replicas) { + UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, + "primary failed while replica initializing", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, + AllocationStatus.NO_ATTEMPT); + applyFailedShard(allocation, failedReplica, unassignedInfo); } - return changed; + return replicas.isEmpty() == false; } - private boolean applyStartedShards(RoutingAllocation routingAllocation, Iterable startedShardEntries) { - boolean dirty = false; - // apply shards might be called several times with the same shard, ignore it + private void applyStartedShards(RoutingAllocation routingAllocation, List startedShardEntries) { + assert startedShardEntries.isEmpty() == false : "non-empty list of started shard entries expected"; RoutingNodes routingNodes = routingAllocation.routingNodes(); for (ShardRouting startedShard : startedShardEntries) { - assert startedShard.initializing(); + assert startedShard.initializing() : "only initializing shards can be started"; + assert routingAllocation.metaData().index(startedShard.shardId().getIndex()) != null : + "shard started for unknown index (shard entry: " + startedShard + ")"; + assert startedShard == routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId()) : + "shard routing to start does not exist in routing table, expected: " + startedShard + " but was: " + + routingNodes.getByAllocationId(startedShard.shardId(), startedShard.allocationId().getId()); - // validate index still exists. strictly speaking this is not needed but it gives clearer logs - if (routingAllocation.metaData().index(startedShard.index()) == null) { - logger.debug("{} ignoring shard started, unknown index (routing: {})", startedShard.shardId(), startedShard); - continue; - } + routingNodes.started(startedShard); + logger.trace("{} marked shard as started (routing: {})", startedShard.shardId(), startedShard); - RoutingNode currentRoutingNode = routingNodes.node(startedShard.currentNodeId()); - if (currentRoutingNode == null) { - logger.debug("{} failed to find shard in order to start it [failed to find node], ignoring (routing: {})", startedShard.shardId(), startedShard); - continue; - } - - ShardRouting matchingShard = currentRoutingNode.getByShardId(startedShard.shardId()); - if (matchingShard == null) { - logger.debug("{} failed to find shard in order to start it [failed to find shard], ignoring (routing: {})", startedShard.shardId(), startedShard); - } else if (matchingShard.isSameAllocation(startedShard) == false) { - logger.debug("{} failed to find shard with matching allocation id in order to start it [failed to find matching shard], ignoring (routing: {}, matched shard routing: {})", startedShard.shardId(), startedShard, matchingShard); - } else { - startedShard = matchingShard; - if (startedShard.active()) { - logger.trace("{} shard is already started, ignoring (routing: {})", startedShard.shardId(), startedShard); - } else { - assert startedShard.initializing(); - dirty = true; - routingNodes.started(startedShard); - logger.trace("{} marked shard as started (routing: {})", startedShard.shardId(), startedShard); - - if (startedShard.relocatingNodeId() != null) { - // relocation target has been started, remove relocation source - RoutingNode relocationSourceNode = routingNodes.node(startedShard.relocatingNodeId()); - ShardRouting relocationSourceShard = relocationSourceNode.getByShardId(startedShard.shardId()); - assert relocationSourceShard.isRelocationSourceOf(startedShard); - routingNodes.remove(relocationSourceShard); - } - } + if (startedShard.relocatingNodeId() != null) { + // relocation target has been started, remove relocation source + RoutingNode relocationSourceNode = routingNodes.node(startedShard.relocatingNodeId()); + ShardRouting relocationSourceShard = relocationSourceNode.getByShardId(startedShard.shardId()); + assert relocationSourceShard.isRelocationSourceOf(startedShard); + assert relocationSourceShard.getTargetRelocatingShard() == startedShard : "relocation target mismatch, expected: " + + startedShard + " but was: " + relocationSourceShard.getTargetRelocatingShard(); + routingNodes.remove(relocationSourceShard); } } - return dirty; } /** - * Applies the relevant logic to handle a failed shard. Returns true if changes happened that - * require relocation. + * Applies the relevant logic to handle a failed shard. */ - private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard, boolean addToIgnoreList, UnassignedInfo unassignedInfo) { - IndexRoutingTable indexRoutingTable = allocation.routingTable().index(failedShard.index()); - if (indexRoutingTable == null) { - logger.debug("{} ignoring shard failure, unknown index in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); - return false; - } + private void applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard, UnassignedInfo unassignedInfo) { RoutingNodes routingNodes = allocation.routingNodes(); + assert failedShard.assignedToNode() : "only assigned shards can be failed"; + assert allocation.metaData().index(failedShard.shardId().getIndex()) != null : + "shard failed for unknown index (shard entry: " + failedShard + ")"; + assert routingNodes.getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()) == failedShard : + "shard routing to fail does not exist in routing table, expected: " + failedShard + " but was: " + + routingNodes.getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()); - RoutingNode matchedNode = routingNodes.node(failedShard.currentNodeId()); - if (matchedNode == null) { - logger.debug("{} ignoring shard failure, unknown node in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); - return false; - } - - ShardRouting matchedShard = matchedNode.getByShardId(failedShard.shardId()); - if (matchedShard != null && matchedShard.isSameAllocation(failedShard)) { - logger.debug("{} failed shard {} found in routingNodes, failing it ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); - // replace incoming instance to make sure we work on the latest one - failedShard = matchedShard; - } else { - logger.debug("{} ignoring shard failure, unknown allocation id in {} ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); - return false; - } - + logger.debug("{} failing shard {} with unassigned info ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary()); if (failedShard.primary()) { // fail replicas first otherwise we move RoutingNodes into an inconsistent state failReplicasForUnassignedPrimary(allocation, failedShard); } - if (addToIgnoreList) { - // make sure we ignore this shard on the relevant node - allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); - } - cancelShard(logger, failedShard, unassignedInfo, routingNodes); - assert matchedNode.getByShardId(failedShard.shardId()) == null : "failedShard " + failedShard + " was matched but wasn't removed"; - return true; + assert routingNodes.node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == null : "failedShard " + failedShard + + " was matched but wasn't removed"; } public static void cancelShard(ESLogger logger, ShardRouting cancelledShard, UnassignedInfo unassignedInfo, RoutingNodes routingNodes) { @@ -544,11 +513,13 @@ public class AllocationService extends AbstractComponent { // The shard is a target of a relocating shard. In that case we only // need to remove the target shard and cancel the source relocation. // No shard is left unassigned - logger.trace("{} is a relocation target, resolving source to cancel relocation ({})", cancelledShard, unassignedInfo.shortSummary()); + logger.trace("{} is a relocation target, resolving source to cancel relocation ({})", cancelledShard, + unassignedInfo.shortSummary()); RoutingNode sourceNode = routingNodes.node(cancelledShard.relocatingNodeId()); ShardRouting sourceShard = sourceNode.getByShardId(cancelledShard.shardId()); assert sourceShard.isRelocationSourceOf(cancelledShard); - logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", cancelledShard.shardId(), sourceShard, unassignedInfo.shortSummary()); + logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", cancelledShard.shardId(), sourceShard, + unassignedInfo.shortSummary()); routingNodes.cancelRelocation(sourceShard); routingNodes.remove(cancelledShard); } else { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java index c3a397a785b..154acb43bb8 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.index.shard.ShardId; import java.util.List; @@ -39,25 +40,28 @@ public class FailedRerouteAllocation extends RoutingAllocation { * details on why it failed. */ public static class FailedShard { - public final ShardRouting shard; + public final ShardRouting routingEntry; public final String message; public final Exception failure; - public FailedShard(ShardRouting shard, String message, Exception failure) { - this.shard = shard; + public FailedShard(ShardRouting routingEntry, String message, Exception failure) { + assert routingEntry.assignedToNode() : "only assigned shards can be failed " + routingEntry; + this.routingEntry = routingEntry; this.message = message; this.failure = failure; } @Override public String toString() { - return "failed shard, shard " + shard + ", message [" + message + "], failure [" + ExceptionsHelper.detailedMessage(failure) + "]"; + return "failed shard, shard " + routingEntry + ", message [" + message + "], failure [" + + ExceptionsHelper.detailedMessage(failure) + "]"; } } private final List failedShards; - public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, List failedShards, ClusterInfo clusterInfo, long currentNanoTime) { + public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, + List failedShards, ClusterInfo clusterInfo, long currentNanoTime) { super(deciders, routingNodes, clusterState, clusterInfo, currentNanoTime, false); this.failedShards = failedShards; } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index f58ff54fc14..d26b976e6be 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -150,7 +150,8 @@ public class RoutingAllocation { * @param clusterState cluster state before rerouting * @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()}) */ - public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, ClusterInfo clusterInfo, long currentNanoTime, boolean retryFailed) { + public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, ClusterInfo clusterInfo, + long currentNanoTime, boolean retryFailed) { this.deciders = deciders; this.routingNodes = routingNodes; this.metaData = clusterState.metaData(); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java index 4d1ac1408a2..e63ce2b19e9 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java @@ -33,9 +33,10 @@ import java.util.List; */ public class StartedRerouteAllocation extends RoutingAllocation { - private final List startedShards; + private final List startedShards; - public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, List startedShards, ClusterInfo clusterInfo, long currentNanoTime) { + public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, + List startedShards, ClusterInfo clusterInfo, long currentNanoTime) { super(deciders, routingNodes, clusterState, clusterInfo, currentNanoTime, false); this.startedShards = startedShards; } @@ -44,7 +45,7 @@ public class StartedRerouteAllocation extends RoutingAllocation { * Get started shards * @return list of started shards */ - public List startedShards() { + public List startedShards() { return startedShards; } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index f2ab421ee5e..b880b04f3da 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -188,11 +188,11 @@ public class ThrottlingAllocationDecider extends AllocationDecider { } else if (shardRouting.relocating()) { initializingShard = shardRouting.cancelRelocation() .relocate(currentNodeId, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE) - .buildTargetRelocatingShard(); + .getTargetRelocatingShard(); } else { assert shardRouting.started(); initializingShard = shardRouting.relocate(currentNodeId, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE) - .buildTargetRelocatingShard(); + .getTargetRelocatingShard(); } assert initializingShard.initializing(); return initializingShard; diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index f074a3ec09c..ab6f6ae3ed2 100644 --- a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -124,8 +124,8 @@ public class GatewayAllocator extends AbstractComponent { public void applyFailedShards(FailedRerouteAllocation allocation) { for (FailedRerouteAllocation.FailedShard shard : allocation.failedShards()) { - Releasables.close(asyncFetchStarted.remove(shard.shard.shardId())); - Releasables.close(asyncFetchStore.remove(shard.shard.shardId())); + Releasables.close(asyncFetchStarted.remove(shard.routingEntry.shardId())); + Releasables.close(asyncFetchStore.remove(shard.routingEntry.shardId())); } } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 6b2997b1a97..fd06175e28c 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -216,7 +216,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple if (masterNode != null) { // TODO: can we remove this? Is resending shard failures the responsibility of shardStateAction? String message = "master " + masterNode + " has not removed previously failed shard. resending shard failure"; logger.trace("[{}] re-sending failed shard [{}], reason [{}]", matchedRouting.shardId(), matchedRouting, message); - shardStateAction.shardFailed(matchedRouting, matchedRouting, message, null, SHARD_STATE_ACTION_LISTENER); + shardStateAction.localShardFailed(matchedRouting, message, null, SHARD_STATE_ACTION_LISTENER); } } } @@ -686,7 +686,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple try { logger.warn("[{}] marking and sending shard failed due to [{}]", failure, shardRouting.shardId(), message); failedShardsCache.put(shardRouting.shardId(), shardRouting); - shardStateAction.shardFailed(shardRouting, shardRouting, message, failure, SHARD_STATE_ACTION_LISTENER); + shardStateAction.localShardFailed(shardRouting, message, failure, SHARD_STATE_ACTION_LISTENER); } catch (Exception inner) { if (failure != null) inner.addSuppressed(failure); logger.warn( diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 8353f6dbacc..2e440d921ee 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -76,7 +76,7 @@ public class ReplicationOperationTests extends ESTestCase { // simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated state = ClusterState.builder(state) .nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build(); - primaryShard = primaryShard.buildTargetRelocatingShard(); + primaryShard = primaryShard.getTargetRelocatingShard(); } final Set expectedReplicas = getExpectedReplicas(shardId, state); @@ -161,7 +161,7 @@ public class ReplicationOperationTests extends ESTestCase { // simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated state = ClusterState.builder(state) .nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build(); - primaryShard = primaryShard.buildTargetRelocatingShard(); + primaryShard = primaryShard.getTargetRelocatingShard(); } final Set expectedReplicas = getExpectedReplicas(shardId, state); @@ -175,7 +175,7 @@ public class ReplicationOperationTests extends ESTestCase { final ClusterState finalState = state; final TestReplicaProxy replicasProxy = new TestReplicaProxy(expectedFailures) { @Override - public void failShard(ShardRouting replica, ShardRouting primary, String message, Exception exception, + public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { assertThat(replica, equalTo(failedReplica)); @@ -311,7 +311,7 @@ public class ReplicationOperationTests extends ESTestCase { } if (shardRouting.relocating() && localNodeId.equals(shardRouting.relocatingNodeId()) == false) { - expectedReplicas.add(shardRouting.buildTargetRelocatingShard()); + expectedReplicas.add(shardRouting.getTargetRelocatingShard()); } } } @@ -422,7 +422,7 @@ public class ReplicationOperationTests extends ESTestCase { } @Override - public void failShard(ShardRouting replica, ShardRouting primary, String message, Exception exception, Runnable onSuccess, + public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { if (failedReplicas.add(replica) == false) { fail("replica [" + replica + "] was failed twice"); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index bca17fb143b..9d8bf87757e 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -534,15 +534,16 @@ public class TransportReplicationActionTests extends ESTestCase { AtomicReference failure = new AtomicReference<>(); AtomicReference ignoredFailure = new AtomicReference<>(); AtomicBoolean success = new AtomicBoolean(); - proxy.failShard(replica, shardRoutings.primaryShard(), "test", new ElasticsearchException("simulated"), + proxy.failShard(replica, randomIntBetween(0, 10), "test", new ElasticsearchException("simulated"), () -> success.set(true), failure::set, ignoredFailure::set ); CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); assertEquals(1, shardFailedRequests.length); CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0]; - ShardStateAction.ShardRoutingEntry shardRoutingEntry = (ShardStateAction.ShardRoutingEntry) shardFailedRequest.request; + ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) shardFailedRequest.request; // the shard the request was sent to and the shard to be failed should be the same - assertEquals(shardRoutingEntry.getShardRouting(), replica); + assertEquals(shardEntry.getShardId(), replica.shardId()); + assertEquals(shardEntry.getAllocationId(), replica.allocationId().getId()); if (randomBoolean()) { // simulate success transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); @@ -553,7 +554,7 @@ public class TransportReplicationActionTests extends ESTestCase { } else if (randomBoolean()) { // simulate the primary has been demoted transport.handleRemoteError(shardFailedRequest.requestId, - new ShardStateAction.NoLongerPrimaryShardException(shardRoutingEntry.getShardRouting().shardId(), + new ShardStateAction.NoLongerPrimaryShardException(replica.shardId(), "shard-failed-test")); assertFalse(success.get()); assertNotNull(failure.get()); diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java index d12b6b563b3..31197e0a9a4 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardIterator; @@ -51,7 +50,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -79,7 +77,8 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa .build()); numberOfReplicas = randomIntBetween(2, 16); metaData = MetaData.builder() - .put(IndexMetaData.builder(INDEX).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(numberOfReplicas)) + .put(IndexMetaData.builder(INDEX).settings(settings(Version.CURRENT)) + .numberOfShards(1).numberOfReplicas(numberOfReplicas).primaryTerm(0, randomIntBetween(2, 10))) .build(); routingTable = RoutingTable.builder() .addAsNew(metaData.index(INDEX)) @@ -89,8 +88,8 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa } public void testEmptyTaskListProducesSameClusterState() throws Exception { - List tasks = Collections.emptyList(); - ClusterStateTaskExecutor.BatchResult result = + List tasks = Collections.emptyList(); + ClusterStateTaskExecutor.BatchResult result = executor.execute(clusterState, tasks); assertTasksSuccessful(tasks, result, clusterState, false); } @@ -98,35 +97,35 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa public void testDuplicateFailuresAreOkay() throws Exception { String reason = "test duplicate failures are okay"; ClusterState currentState = createClusterStateWithStartedShards(reason); - List tasks = createExistingShards(currentState, reason); - ClusterStateTaskExecutor.BatchResult result = executor.execute(currentState, tasks); + List tasks = createExistingShards(currentState, reason); + ClusterStateTaskExecutor.BatchResult result = executor.execute(currentState, tasks); assertTasksSuccessful(tasks, result, clusterState, true); } public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { String reason = "test non existent shards are marked as successful"; ClusterState currentState = createClusterStateWithStartedShards(reason); - List tasks = createNonExistentShards(currentState, reason); - ClusterStateTaskExecutor.BatchResult result = executor.execute(clusterState, tasks); + List tasks = createNonExistentShards(currentState, reason); + ClusterStateTaskExecutor.BatchResult result = executor.execute(clusterState, tasks); assertTasksSuccessful(tasks, result, clusterState, false); } public void testTriviallySuccessfulTasksBatchedWithFailingTasks() throws Exception { String reason = "test trivially successful tasks batched with failing tasks"; ClusterState currentState = createClusterStateWithStartedShards(reason); - List failingTasks = createExistingShards(currentState, reason); - List nonExistentTasks = createNonExistentShards(currentState, reason); + List failingTasks = createExistingShards(currentState, reason); + List nonExistentTasks = createNonExistentShards(currentState, reason); ShardStateAction.ShardFailedClusterStateTaskExecutor failingExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger) { @Override RoutingAllocation.Result applyFailedShards(ClusterState currentState, List failedShards) { throw new RuntimeException("simulated applyFailedShards failure"); } }; - List tasks = new ArrayList<>(); + List tasks = new ArrayList<>(); tasks.addAll(failingTasks); tasks.addAll(nonExistentTasks); - ClusterStateTaskExecutor.BatchResult result = failingExecutor.execute(currentState, tasks); - Map taskResultMap = + ClusterStateTaskExecutor.BatchResult result = failingExecutor.execute(currentState, tasks); + Map taskResultMap = failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure")))); taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success()))); assertTaskResults(taskResultMap, result, currentState, false); @@ -135,16 +134,20 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa public void testIllegalShardFailureRequests() throws Exception { String reason = "test illegal shard failure requests"; ClusterState currentState = createClusterStateWithStartedShards(reason); - List failingTasks = createExistingShards(currentState, reason); - List tasks = new ArrayList<>(); - for (ShardStateAction.ShardRoutingEntry failingTask : failingTasks) { - tasks.add(new ShardStateAction.ShardRoutingEntry(failingTask.getShardRouting(), randomInvalidSourceShard(currentState, failingTask.getShardRouting()), failingTask.message, failingTask.failure)); + List failingTasks = createExistingShards(currentState, reason); + List tasks = new ArrayList<>(); + for (ShardStateAction.ShardEntry failingTask : failingTasks) { + long primaryTerm = currentState.metaData().index(failingTask.shardId.getIndex()).primaryTerm(failingTask.shardId.id()); + tasks.add(new ShardStateAction.ShardEntry(failingTask.shardId, failingTask.allocationId, + randomIntBetween(1, (int) primaryTerm - 1), failingTask.message, failingTask.failure)); } - Map taskResultMap = + Map taskResultMap = tasks.stream().collect(Collectors.toMap( Function.identity(), - task -> ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.getShardRouting().shardId(), "source shard [" + task.sourceShardRouting + "] is neither the local allocation nor the primary allocation")))); - ClusterStateTaskExecutor.BatchResult result = executor.execute(currentState, tasks); + task -> ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.shardId, + "primary term [" + task.primaryTerm + "] did not match current primary term [" + + currentState.metaData().index(task.shardId.getIndex()).primaryTerm(task.shardId.id()) + "]")))); + ClusterStateTaskExecutor.BatchResult result = executor.execute(currentState, tasks); assertTaskResults(taskResultMap, result, currentState, false); } @@ -163,7 +166,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa return ClusterState.builder(stateAfterReroute).routingTable(afterStart).build(); } - private List createExistingShards(ClusterState currentState, String reason) { + private List createExistingShards(ClusterState currentState, String reason) { List shards = new ArrayList<>(); GroupShardsIterator shardGroups = currentState.routingTable().allAssignedShardsGrouped(new String[] { INDEX }, true); @@ -182,7 +185,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa return toTasks(currentState, shardsToFail, indexUUID, reason); } - private List createNonExistentShards(ClusterState currentState, String reason) { + private List createNonExistentShards(ClusterState currentState, String reason) { // add shards from a non-existent index String nonExistentIndexUUID = "non-existent"; Index index = new Index("non-existent", nonExistentIndexUUID); @@ -196,17 +199,14 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa nonExistentShards.add(nonExistentShardRouting(index, nodeIds, false)); } - List existingShards = createExistingShards(currentState, reason); - List shardsWithMismatchedAllocationIds = new ArrayList<>(); - for (ShardStateAction.ShardRoutingEntry existingShard : existingShards) { - ShardRouting sr = existingShard.getShardRouting(); - ShardRouting nonExistentShardRouting = - TestShardRouting.newShardRouting(sr.shardId(), sr.currentNodeId(), sr.relocatingNodeId(), sr.restoreSource(), sr.primary(), sr.state()); - shardsWithMismatchedAllocationIds.add(new ShardStateAction.ShardRoutingEntry(nonExistentShardRouting, nonExistentShardRouting, existingShard.message, existingShard.failure)); + List existingShards = createExistingShards(currentState, reason); + List shardsWithMismatchedAllocationIds = new ArrayList<>(); + for (ShardStateAction.ShardEntry existingShard : existingShards) { + shardsWithMismatchedAllocationIds.add(new ShardStateAction.ShardEntry(existingShard.shardId, UUIDs.randomBase64UUID(), 0L, existingShard.message, existingShard.failure)); } - List tasks = new ArrayList<>(); - nonExistentShards.forEach(shard -> tasks.add(new ShardStateAction.ShardRoutingEntry(shard, shard, reason, new CorruptIndexException("simulated", nonExistentIndexUUID)))); + List tasks = new ArrayList<>(); + nonExistentShards.forEach(shard -> tasks.add(new ShardStateAction.ShardEntry(shard.shardId(), shard.allocationId().getId(), 0L, reason, new CorruptIndexException("simulated", nonExistentIndexUUID)))); tasks.addAll(shardsWithMismatchedAllocationIds); return tasks; } @@ -216,41 +216,42 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa } private static void assertTasksSuccessful( - List tasks, - ClusterStateTaskExecutor.BatchResult result, + List tasks, + ClusterStateTaskExecutor.BatchResult result, ClusterState clusterState, boolean clusterStateChanged ) { - Map taskResultMap = + Map taskResultMap = tasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success())); assertTaskResults(taskResultMap, result, clusterState, clusterStateChanged); } private static void assertTaskResults( - Map taskResultMap, - ClusterStateTaskExecutor.BatchResult result, + Map taskResultMap, + ClusterStateTaskExecutor.BatchResult result, ClusterState clusterState, boolean clusterStateChanged ) { // there should be as many task results as tasks assertEquals(taskResultMap.size(), result.executionResults.size()); - for (Map.Entry entry : taskResultMap.entrySet()) { + for (Map.Entry entry : taskResultMap.entrySet()) { // every task should have a corresponding task result assertTrue(result.executionResults.containsKey(entry.getKey())); // the task results are as expected - assertEquals(entry.getValue().isSuccess(), result.executionResults.get(entry.getKey()).isSuccess()); + assertEquals(entry.getKey().toString(), entry.getValue().isSuccess(), result.executionResults.get(entry.getKey()).isSuccess()); } List shards = clusterState.getRoutingTable().allShards(); - for (Map.Entry entry : taskResultMap.entrySet()) { + for (Map.Entry entry : taskResultMap.entrySet()) { if (entry.getValue().isSuccess()) { - // the shard was successfully failed and so should not - // be in the routing table + // the shard was successfully failed and so should not be in the routing table for (ShardRouting shard : shards) { - if (entry.getKey().getShardRouting().allocationId() != null) { - assertThat(shard.allocationId(), not(equalTo(entry.getKey().getShardRouting().allocationId()))); + if (shard.assignedToNode()) { + assertFalse("entry key " + entry.getKey() + ", shard routing " + shard, + entry.getKey().getShardId().equals(shard.shardId()) && + entry.getKey().getAllocationId().equals(shard.allocationId().getId())); } } } else { @@ -268,50 +269,15 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa } } - private static List toTasks(ClusterState currentState, List shards, String indexUUID, String message) { + private static List toTasks(ClusterState currentState, List shards, String indexUUID, String message) { return shards .stream() - .map(shard -> new ShardStateAction.ShardRoutingEntry(shard, randomValidSourceShard(currentState, shard), message, new CorruptIndexException("simulated", indexUUID))) + .map(shard -> new ShardStateAction.ShardEntry( + shard.shardId(), + shard.allocationId().getId(), + randomBoolean() ? 0L : currentState.metaData().getIndexSafe(shard.index()).primaryTerm(shard.id()), + message, + new CorruptIndexException("simulated", indexUUID))) .collect(Collectors.toList()); } - - private static ShardRouting randomValidSourceShard(ClusterState currentState, ShardRouting shardRouting) { - // for the request node ID to be valid, either the request is - // from the node the shard is assigned to, or the request is - // from the node holding the primary shard - if (randomBoolean()) { - // request from local node - return shardRouting; - } else { - // request from primary node unless in the case of - // non-existent shards there is not one and we fallback to - // the local node - ShardRouting primaryNodeId = primaryShard(currentState, shardRouting); - return primaryNodeId != null ? primaryNodeId : shardRouting; - } - } - - private static ShardRouting randomInvalidSourceShard(ClusterState currentState, ShardRouting shardRouting) { - ShardRouting primaryShard = primaryShard(currentState, shardRouting); - Set shards = - currentState - .routingTable() - .allShards() - .stream() - .filter(shard -> !shard.isSameAllocation(shardRouting)) - .filter(shard -> !shard.isSameAllocation(primaryShard)) - .collect(Collectors.toSet()); - if (!shards.isEmpty()) { - return randomSubsetOf(1, shards.toArray(new ShardRouting[0])).get(0); - } else { - return - TestShardRouting.newShardRouting(shardRouting.shardId(), UUIDs.randomBase64UUID(random()), randomBoolean(), - randomFrom(ShardRoutingState.values())); - } - } - - private static ShardRouting primaryShard(ClusterState currentState, ShardRouting shardRouting) { - IndexShardRoutingTable indexShard = currentState.getRoutingTable().shardRoutingTableOrNull(shardRouting.shardId()); - return indexShard == null ? null : indexShard.primaryShard(); - } } diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index d387d6f7d43..762e7d9e75a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -29,9 +29,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardsIterator; -import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -61,6 +59,7 @@ import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; public class ShardStateActionTests extends ESTestCase { @@ -89,9 +88,9 @@ public class ShardStateActionTests extends ESTestCase { } @Override - protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) { + protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardEntry shardEntry, Listener listener) { onBeforeWaitForNewMasterAndRetry.run(); - super.waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener); + super.waitForNewMasterAndRetry(actionName, observer, shardEntry, listener); onAfterWaitForNewMasterAndRetry.run(); } } @@ -140,7 +139,7 @@ public class ShardStateActionTests extends ESTestCase { CountDownLatch latch = new CountDownLatch(1); ShardRouting shardRouting = getRandomShardRouting(index); - shardStateAction.shardFailed(shardRouting, shardRouting, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.localShardFailed(shardRouting, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { success.set(true); @@ -158,10 +157,11 @@ public class ShardStateActionTests extends ESTestCase { CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertEquals(1, capturedRequests.length); // the request is a shard failed request - assertThat(capturedRequests[0].request, is(instanceOf(ShardStateAction.ShardRoutingEntry.class))); - ShardStateAction.ShardRoutingEntry shardRoutingEntry = (ShardStateAction.ShardRoutingEntry) capturedRequests[0].request; + assertThat(capturedRequests[0].request, is(instanceOf(ShardStateAction.ShardEntry.class))); + ShardStateAction.ShardEntry shardEntry = (ShardStateAction.ShardEntry) capturedRequests[0].request; // for the right shard - assertEquals(shardRouting, shardRoutingEntry.getShardRouting()); + assertEquals(shardEntry.shardId, shardRouting.shardId()); + assertEquals(shardEntry.allocationId, shardRouting.allocationId().getId()); // sent to the master assertEquals(clusterService.state().nodes().getMasterNode().getId(), capturedRequests[0].node.getId()); @@ -188,7 +188,7 @@ public class ShardStateActionTests extends ESTestCase { }); ShardRouting failedShard = getRandomShardRouting(index); - shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { success.set(true); @@ -237,7 +237,7 @@ public class ShardStateActionTests extends ESTestCase { setUpMasterRetryVerification(numberOfRetries, retries, latch, retryLoop); ShardRouting failedShard = getRandomShardRouting(index); - shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { success.set(true); @@ -273,7 +273,7 @@ public class ShardStateActionTests extends ESTestCase { AtomicBoolean failure = new AtomicBoolean(); ShardRouting failedShard = getRandomShardRouting(index); - shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { failure.set(false); @@ -305,7 +305,7 @@ public class ShardStateActionTests extends ESTestCase { ShardRouting failedShard = getRandomShardRouting(index); RoutingTable routingTable = RoutingTable.builder(clusterService.state().getRoutingTable()).remove(index).build(); setState(clusterService, ClusterState.builder(clusterService.state()).routingTable(routingTable)); - shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { success.set(true); @@ -334,13 +334,12 @@ public class ShardStateActionTests extends ESTestCase { ShardRouting failedShard = getRandomShardRouting(index); - String nodeId = randomFrom(clusterService.state().nodes().getNodes().keys().toArray(String.class)); - AtomicReference failure = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - ShardRouting sourceFailedShard = TestShardRouting.newShardRouting(failedShard.shardId(), nodeId, randomBoolean(), randomFrom(ShardRoutingState.values())); - shardStateAction.shardFailed(failedShard, sourceFailedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + long primaryTerm = clusterService.state().metaData().index(index).primaryTerm(failedShard.id()); + assertThat(primaryTerm, greaterThanOrEqualTo(1L)); + shardStateAction.remoteShardFailed(failedShard, primaryTerm + 1, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { failure.set(null); @@ -355,7 +354,7 @@ public class ShardStateActionTests extends ESTestCase { }); ShardStateAction.NoLongerPrimaryShardException catastrophicError = - new ShardStateAction.NoLongerPrimaryShardException(failedShard.shardId(), "source shard [" + sourceFailedShard + " is neither the local allocation nor the primary allocation"); + new ShardStateAction.NoLongerPrimaryShardException(failedShard.shardId(), "dummy failure"); CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); transport.handleRemoteError(capturedRequests[0].requestId, catastrophicError); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java index 2e368f322ee..9daee812193 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; @@ -70,7 +69,7 @@ public class AllocationIdTests extends ESTestCase { assertThat(shard.allocationId().getId(), equalTo(allocationId.getId())); assertThat(shard.allocationId().getRelocationId(), notNullValue()); - ShardRouting target = shard.buildTargetRelocatingShard(); + ShardRouting target = shard.getTargetRelocatingShard(); assertThat(target.allocationId().getId(), equalTo(shard.allocationId().getRelocationId())); assertThat(target.allocationId().getRelocationId(), equalTo(shard.allocationId().getId())); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java index fa9133f6d36..a689acd04a5 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java @@ -23,7 +23,6 @@ import org.elasticsearch.Version; import org.elasticsearch.common.UUIDs; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.test.ESTestCase; @@ -86,7 +85,7 @@ public class ShardRoutingTests extends ESTestCase { assertFalse(startedShard1.isRelocationTarget()); ShardRouting sourceShard0a = startedShard0.relocate("node2", -1); assertFalse(sourceShard0a.isRelocationTarget()); - ShardRouting targetShard0a = sourceShard0a.buildTargetRelocatingShard(); + ShardRouting targetShard0a = sourceShard0a.getTargetRelocatingShard(); assertTrue(targetShard0a.isRelocationTarget()); ShardRouting sourceShard0b = startedShard0.relocate("node2", -1); ShardRouting sourceShard1 = startedShard1.relocate("node2", -1); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 5f4d6b5a8ba..0b9e20b3578 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -37,6 +37,8 @@ import org.elasticsearch.test.ESAllocationTestCase; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -218,9 +220,6 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2"))); assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1)); assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(UNASSIGNED)); - - logger.info("fail the shard again, check that nothing happens"); - assertThat(strategy.applyFailedShard(clusterState, shardToFail).changed(), equalTo(false)); } public void testFirstAllocationFailureSingleNode() { @@ -274,9 +273,6 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); } - - logger.info("fail the shard again, see that nothing happens"); - assertThat(strategy.applyFailedShard(clusterState, firstShard).changed(), equalTo(false)); } public void testSingleShardMultipleAllocationFailures() { @@ -317,11 +313,17 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { int shardsToFail = randomIntBetween(1, numberOfReplicas); ArrayList failedShards = new ArrayList<>(); RoutingNodes routingNodes = clusterState.getRoutingNodes(); + Set failedNodes = new HashSet<>(); + Set shardRoutingsToFail = new HashSet<>(); for (int i = 0; i < shardsToFail; i++) { - String n = "node" + Integer.toString(randomInt(numberOfReplicas)); - logger.info("failing shard on node [{}]", n); - ShardRouting shardToFail = routingNodes.node(n).iterator().next(); - failedShards.add(new FailedRerouteAllocation.FailedShard(shardToFail, null, null)); + String failedNode = "node" + Integer.toString(randomInt(numberOfReplicas)); + logger.info("failing shard on node [{}]", failedNode); + ShardRouting shardToFail = routingNodes.node(failedNode).iterator().next(); + if (shardRoutingsToFail.contains(shardToFail) == false) { + failedShards.add(new FailedRerouteAllocation.FailedShard(shardToFail, null, null)); + failedNodes.add(failedNode); + shardRoutingsToFail.add(shardToFail); + } } routingTable = strategy.applyFailedShards(clusterState, failedShards).routingTable(); @@ -329,8 +331,14 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.getRoutingNodes(); for (FailedRerouteAllocation.FailedShard failedShard : failedShards) { - if (!routingNodes.node(failedShard.shard.currentNodeId()).isEmpty()) { - fail("shard " + failedShard + " was re-assigned to it's node"); + if (routingNodes.getByAllocationId(failedShard.routingEntry.shardId(), failedShard.routingEntry.allocationId().getId()) != null) { + fail("shard " + failedShard + " was not failed"); + } + } + + for (String failedNode : failedNodes) { + if (!routingNodes.node(failedNode).isEmpty()) { + fail("shard was re-assigned to failed node " + failedNode); } } } @@ -390,9 +398,6 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); } - - logger.info("fail the shard again, see that nothing happens"); - assertThat(strategy.applyFailedShard(clusterState, firstShard).changed(), equalTo(false)); } public void testRebalanceFailure() { @@ -530,10 +535,6 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)).routingTable()).build(); assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); - - // simulate another failure coming in, with the "old" shard routing, verify that nothing changes, and we ignore it - routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail); - assertThat(routingResult.changed(), equalTo(false)); } public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToElect() { @@ -575,9 +576,5 @@ public class FailedShardsRoutingTests extends ESAllocationTestCase { ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); - - // simulate another failure coming in, with the "old" shard routing, verify that nothing changes, and we ignore it - routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail); - assertThat(routingResult.changed(), equalTo(false)); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java index 16a11a9c150..4e32cd1e2cc 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/StartedShardsRoutingTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; @@ -48,7 +47,7 @@ public class StartedShardsRoutingTests extends ESAllocationTestCase { logger.info("--> building initial cluster state"); final IndexMetaData indexMetaData = IndexMetaData.builder("test") .settings(settings(Version.CURRENT)) - .numberOfShards(3).numberOfReplicas(0) + .numberOfShards(2).numberOfReplicas(0) .build(); final Index index = indexMetaData.getIndex(); ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) @@ -56,69 +55,27 @@ public class StartedShardsRoutingTests extends ESAllocationTestCase { .metaData(MetaData.builder().put(indexMetaData, false)); final ShardRouting initShard = TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.INITIALIZING); - final ShardRouting startedShard = TestShardRouting.newShardRouting(new ShardId(index, 1), "node2", true, ShardRoutingState.STARTED); - final ShardRouting relocatingShard = TestShardRouting.newShardRouting(new ShardId(index, 2), "node1", "node2", true, ShardRoutingState.RELOCATING); + final ShardRouting relocatingShard = TestShardRouting.newShardRouting(new ShardId(index, 1), "node1", "node2", true, ShardRoutingState.RELOCATING); stateBuilder.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index) .addIndexShard(new IndexShardRoutingTable.Builder(initShard.shardId()).addShard(initShard).build()) - .addIndexShard(new IndexShardRoutingTable.Builder(startedShard.shardId()).addShard(startedShard).build()) .addIndexShard(new IndexShardRoutingTable.Builder(relocatingShard.shardId()).addShard(relocatingShard).build())).build()); ClusterState state = stateBuilder.build(); logger.info("--> test starting of shard"); - RoutingAllocation.Result result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(initShard.shardId(), initShard.currentNodeId(), initShard.relocatingNodeId(), initShard.primary(), - ShardRoutingState.INITIALIZING, initShard.allocationId())), false); + RoutingAllocation.Result result = allocation.applyStartedShards(state, Arrays.asList(initShard), false); assertTrue("failed to start " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); assertTrue(initShard + "isn't started \ncurrent routing table:" + result.routingTable().prettyPrint(), result.routingTable().index("test").shard(initShard.id()).allShardsStarted()); - logger.info("--> testing shard variants that shouldn't match the initializing shard"); - - result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(initShard.shardId(), initShard.currentNodeId(), initShard.relocatingNodeId(), initShard.primary(), - ShardRoutingState.INITIALIZING)), false); - assertFalse("wrong allocation id flag shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); - - result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(initShard.shardId(), "some_node", initShard.currentNodeId(), initShard.primary(), - ShardRoutingState.INITIALIZING, AllocationId.newTargetRelocation(AllocationId.newRelocation(initShard.allocationId())))), false); - assertFalse("relocating shard from node shouldn't start shard " + initShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); - - - - logger.info("--> testing double starting"); - - result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(startedShard.shardId(), startedShard.currentNodeId(), startedShard.relocatingNodeId(), startedShard.primary(), - ShardRoutingState.INITIALIZING, startedShard.allocationId())), false); - assertFalse("duplicate starting of the same shard should be ignored \ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); - logger.info("--> testing starting of relocating shards"); - final AllocationId targetAllocationId = AllocationId.newTargetRelocation(relocatingShard.allocationId()); - result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(relocatingShard.shardId(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(), - ShardRoutingState.INITIALIZING, targetAllocationId)), false); - + result = allocation.applyStartedShards(state, Arrays.asList(relocatingShard.getTargetRelocatingShard()), false); assertTrue("failed to start " + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); ShardRouting shardRouting = result.routingTable().index("test").shard(relocatingShard.id()).getShards().get(0); assertThat(shardRouting.state(), equalTo(ShardRoutingState.STARTED)); assertThat(shardRouting.currentNodeId(), equalTo("node2")); assertThat(shardRouting.relocatingNodeId(), nullValue()); - - logger.info("--> testing shard variants that shouldn't match the initializing relocating shard"); - - result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(relocatingShard.shardId(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(), - ShardRoutingState.INITIALIZING))); - assertFalse("wrong allocation id shouldn't start shard" + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); - - result = allocation.applyStartedShards(state, Arrays.asList( - TestShardRouting.newShardRouting(relocatingShard.shardId(), relocatingShard.relocatingNodeId(), relocatingShard.currentNodeId(), relocatingShard.primary(), - ShardRoutingState.INITIALIZING, relocatingShard.allocationId())), false); - assertFalse("wrong allocation id shouldn't start shard even if relocatingId==shard.id" + relocatingShard + "\ncurrent routing table:" + result.routingTable().prettyPrint(), result.changed()); - } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 56ca6381af9..7ede869f0a6 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -265,7 +265,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase { assertEquals(10L, DiskThresholdDecider.getExpectedShardSize(test_0, allocation, 0)); RoutingNode node = new RoutingNode("node1", new DiscoveryNode("node1", new LocalTransportAddress("test"), - emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.buildTargetRelocatingShard(), test_2); + emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.getTargetRelocatingShard(), test_2); assertEquals(100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null")); assertEquals(90L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null")); assertEquals(0L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/some/other/dev")); @@ -283,7 +283,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase { other_0 = ShardRoutingHelper.relocate(other_0, "node1"); node = new RoutingNode("node1", new DiscoveryNode("node1", new LocalTransportAddress("test"), - emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.buildTargetRelocatingShard(), test_2, other_0.buildTargetRelocatingShard()); + emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.getTargetRelocatingShard(), test_2, other_0.getTargetRelocatingShard()); if (other_0.primary()) { assertEquals(10100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null")); assertEquals(10090L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null")); diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index 466d3b4f83d..4ad3abc59b4 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -951,7 +951,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { NetworkPartition networkPartition = addRandomIsolation(isolatedNode); networkPartition.startDisrupting(); - service.shardFailed(failedShard, failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new + service.localShardFailed(failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new ShardStateAction.Listener() { @Override public void onSuccess() { diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index cd2b4eaf2e4..2a0410f272b 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -563,7 +563,7 @@ public class NodeJoinControllerTests extends ESTestCase { } @Override - public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List startedShards, + public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List startedShards, boolean withReroute) { return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 27eb753dfe5..729bc549af4 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -477,7 +477,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase { } @Override - public void failShard(ShardRouting replica, ShardRouting primary, String message, Exception exception, Runnable onSuccess, + public void failShard(ShardRouting replica, long primaryTerm, String message, Exception exception, Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { throw new UnsupportedOperationException(); } diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 1a8caaa3514..152be45d558 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.cluster; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction; @@ -41,6 +42,7 @@ import org.elasticsearch.action.support.master.TransportMasterNodeActionUtils; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.AliasValidator; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -53,12 +55,12 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RandomAllocationDeciderTests; -import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; @@ -78,6 +80,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.stream.Collectors; import static com.carrotsearch.randomizedtesting.RandomizedTest.getRandom; import static org.elasticsearch.env.Environment.PATH_HOME_SETTING; @@ -91,10 +94,11 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class ClusterStateChanges { +public class ClusterStateChanges extends AbstractComponent { private final ClusterService clusterService; - private final AllocationService allocationService; + private final ShardStateAction.ShardFailedClusterStateTaskExecutor shardFailedClusterStateTaskExecutor; + private final ShardStateAction.ShardStartedClusterStateTaskExecutor shardStartedClusterStateTaskExecutor; // transport actions private final TransportCloseIndexAction transportCloseIndexAction; @@ -105,14 +109,16 @@ public class ClusterStateChanges { private final TransportCreateIndexAction transportCreateIndexAction; public ClusterStateChanges() { - Settings settings = Settings.builder().put(PATH_HOME_SETTING.getKey(), "dummy").build(); + super(Settings.builder().put(PATH_HOME_SETTING.getKey(), "dummy").build()); - allocationService = new AllocationService(settings, new AllocationDeciders(settings, + final AllocationService allocationService = new AllocationService(settings, new AllocationDeciders(settings, new HashSet<>(Arrays.asList(new SameShardAllocationDecider(settings), new ReplicaAfterPrimaryActiveAllocationDecider(settings), new RandomAllocationDeciderTests.RandomAllocationDecider(getRandom())))), NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE); + shardFailedClusterStateTaskExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger); + shardStartedClusterStateTaskExecutor = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, logger); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ActionFilters actionFilters = new ActionFilters(Collections.emptySet()); IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(settings); @@ -199,13 +205,26 @@ public class ClusterStateChanges { } public ClusterState applyFailedShards(ClusterState clusterState, List failedShards) { - RoutingAllocation.Result rerouteResult = allocationService.applyFailedShards(clusterState, failedShards); - return ClusterState.builder(clusterState).routingResult(rerouteResult).build(); + List entries = failedShards.stream().map(failedShard -> + new ShardStateAction.ShardEntry(failedShard.routingEntry.shardId(), failedShard.routingEntry.allocationId().getId(), + 0L, failedShard.message, failedShard.failure)) + .collect(Collectors.toList()); + try { + return shardFailedClusterStateTaskExecutor.execute(clusterState, entries).resultingState; + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } } public ClusterState applyStartedShards(ClusterState clusterState, List startedShards) { - RoutingAllocation.Result rerouteResult = allocationService.applyStartedShards(clusterState, startedShards); - return ClusterState.builder(clusterState).routingResult(rerouteResult).build(); + List entries = startedShards.stream().map(startedShard -> + new ShardStateAction.ShardEntry(startedShard.shardId(), startedShard.allocationId().getId(), 0L, "shard started", null)) + .collect(Collectors.toList()); + try { + return shardStartedClusterStateTaskExecutor.execute(clusterState, entries).resultingState; + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } } private , Response extends ActionResponse> ClusterState execute(