diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index 38843e48d2c..57e1109978d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -508,9 +508,11 @@ public class PrimaryAllocationIT extends ESIntegTestCase { final ShardId shardId = new ShardId(clusterService().state().metadata().index("test").getIndex(), 0); final Set replicaNodes = new HashSet<>(internalCluster().startDataOnlyNodes(numberOfReplicas)); ensureGreen(); + String timeout = randomFrom("0s", "1s", "2s"); assertAcked( client(master).admin().cluster().prepareUpdateSettings() - .setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")).get()); + .setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")) + .setPersistentSettings(Settings.builder().put("indices.replication.retry_timeout", timeout)).get()); logger.info("--> Indexing with gap in seqno to ensure that some operations will be replayed in resync"); long numDocs = scaledRandomIntBetween(5, 50); for (int i = 0; i < numDocs; i++) { diff --git a/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java b/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java index 4629cd2e268..52e8fef6edd 100644 --- a/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java @@ -65,7 +65,7 @@ public abstract class RetryableAction { if (initialDelayMillis < 1) { throw new IllegalArgumentException("Initial delay was less than 1 millisecond: " + initialDelay); } - this.timeoutMillis = Math.max(timeoutValue.getMillis(), 1); + this.timeoutMillis = timeoutValue.getMillis(); this.startMillis = threadPool.relativeTimeInMillis(); this.finalListener = listener; this.executor = executor; diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/PendingReplicationActions.java b/server/src/main/java/org/elasticsearch/action/support/replication/PendingReplicationActions.java new file mode 100644 index 00000000000..385b12fb3ee --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/support/replication/PendingReplicationActions.java @@ -0,0 +1,114 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.replication; + +import org.elasticsearch.action.support.RetryableAction; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.shard.IndexShardClosedException; +import org.elasticsearch.index.shard.ReplicationGroup; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +public class PendingReplicationActions implements Consumer, Releasable { + + private final Map>> onGoingReplicationActions = ConcurrentCollections.newConcurrentMap(); + private final ShardId shardId; + private final ThreadPool threadPool; + private volatile long replicationGroupVersion = -1; + + public PendingReplicationActions(ShardId shardId, ThreadPool threadPool) { + this.shardId = shardId; + this.threadPool = threadPool; + } + + public void addPendingAction(String allocationId, RetryableAction replicationAction) { + Set> ongoingActionsOnNode = onGoingReplicationActions.get(allocationId); + if (ongoingActionsOnNode != null) { + ongoingActionsOnNode.add(replicationAction); + if (onGoingReplicationActions.containsKey(allocationId) == false) { + replicationAction.cancel(new IndexShardClosedException(shardId, + "Replica unavailable - replica could have left ReplicationGroup or IndexShard might have closed")); + } + } else { + replicationAction.cancel(new IndexShardClosedException(shardId, + "Replica unavailable - replica could have left ReplicationGroup or IndexShard might have closed")); + } + } + + public void removeReplicationAction(String allocationId, RetryableAction action) { + Set> ongoingActionsOnNode = onGoingReplicationActions.get(allocationId); + if (ongoingActionsOnNode != null) { + ongoingActionsOnNode.remove(action); + } + } + + @Override + public void accept(ReplicationGroup replicationGroup) { + if (isNewerVersion(replicationGroup)) { + synchronized (this) { + if (isNewerVersion(replicationGroup)) { + acceptNewTrackedAllocationIds(replicationGroup.getTrackedAllocationIds()); + replicationGroupVersion = replicationGroup.getVersion(); + } + } + } + } + + private boolean isNewerVersion(ReplicationGroup replicationGroup) { + // Relative comparison to mitigate long overflow + return replicationGroup.getVersion() - replicationGroupVersion > 0; + } + + // Visible for testing + synchronized void acceptNewTrackedAllocationIds(Set trackedAllocationIds) { + for (String targetAllocationId : trackedAllocationIds) { + onGoingReplicationActions.putIfAbsent(targetAllocationId, ConcurrentCollections.newConcurrentSet()); + } + ArrayList>> toCancel = new ArrayList<>(); + for (String allocationId : onGoingReplicationActions.keySet()) { + if (trackedAllocationIds.contains(allocationId) == false) { + toCancel.add(onGoingReplicationActions.remove(allocationId)); + } + } + + cancelActions(toCancel, "Replica left ReplicationGroup"); + } + + @Override + public synchronized void close() { + ArrayList>> toCancel = new ArrayList<>(onGoingReplicationActions.values()); + onGoingReplicationActions.clear(); + + cancelActions(toCancel, "Primary closed."); + } + + private void cancelActions(ArrayList>> toCancel, String message) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> toCancel.stream() + .flatMap(Collection::stream) + .forEach(action -> action.cancel(new IndexShardClosedException(shardId, message)))); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 753f0d7b435..bd0a692d217 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -27,17 +27,23 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.RetryableAction; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ConnectTransportException; import java.io.IOException; import java.util.ArrayList; @@ -54,6 +60,7 @@ public class ReplicationOperation< PrimaryResultT extends ReplicationOperation.PrimaryResult > { private final Logger logger; + private final ThreadPool threadPool; private final Request request; private final String opType; private final AtomicInteger totalShards = new AtomicInteger(); @@ -72,6 +79,8 @@ public class ReplicationOperation< private final Primary primary; private final Replicas replicasProxy; private final AtomicBoolean finished = new AtomicBoolean(); + private final TimeValue initialRetryBackoffBound; + private final TimeValue retryTimeout; private final long primaryTerm; // exposed for tests @@ -84,14 +93,18 @@ public class ReplicationOperation< public ReplicationOperation(Request request, Primary primary, ActionListener listener, Replicas replicas, - Logger logger, String opType, long primaryTerm) { + Logger logger, ThreadPool threadPool, String opType, long primaryTerm, TimeValue initialRetryBackoffBound, + TimeValue retryTimeout) { this.replicasProxy = replicas; this.primary = primary; this.resultListener = listener; this.logger = logger; + this.threadPool = threadPool; this.request = request; this.opType = opType; this.primaryTerm = primaryTerm; + this.initialRetryBackoffBound = initialRetryBackoffBound; + this.retryTimeout = retryTimeout; } public void execute() throws Exception { @@ -130,8 +143,9 @@ public class ReplicationOperation< final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes(); assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized"; final ReplicationGroup replicationGroup = primary.getReplicationGroup(); + final PendingReplicationActions pendingReplicationActions = primary.getPendingReplicationActions(); markUnavailableShardsAsStale(replicaRequest, replicationGroup); - performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup); + performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup, pendingReplicationActions); } primaryResult.runPostReplicationActions(new ActionListener() { @@ -165,7 +179,8 @@ public class ReplicationOperation< } private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint, - final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) { + final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup, + final PendingReplicationActions pendingReplicationActions) { // for total stats, add number of unassigned shards and // number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target) totalShards.addAndGet(replicationGroup.getSkippedShards().size()); @@ -174,52 +189,78 @@ public class ReplicationOperation< for (final ShardRouting shard : replicationGroup.getReplicationTargets()) { if (shard.isSameAllocation(primaryRouting) == false) { - performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); + performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, pendingReplicationActions); } } } private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, - final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) { + final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, + final PendingReplicationActions pendingReplicationActions) { if (logger.isTraceEnabled()) { logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest); } - totalShards.incrementAndGet(); pendingActions.incrementAndGet(); - replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, - new ActionListener() { - @Override - public void onResponse(ReplicaResponse response) { - successfulShards.incrementAndGet(); - try { - updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint); - } finally { - decPendingAndFinishIfNeeded(); - } + final ActionListener replicationListener = new ActionListener() { + @Override + public void onResponse(ReplicaResponse response) { + successfulShards.incrementAndGet(); + try { + updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint); + } finally { + decPendingAndFinishIfNeeded(); } + } - @Override - public void onFailure(Exception replicaException) { - logger.trace(() -> new ParameterizedMessage( - "[{}] failure while performing [{}] on replica {}, request [{}]", - shard.shardId(), opType, shard, replicaRequest), replicaException); - // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report. - if (TransportActions.isShardNotAvailableException(replicaException) == false) { - RestStatus restStatus = ExceptionsHelper.status(replicaException); - shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure( - shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); - } - String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); - replicasProxy.failShardIfNeeded(shard, primaryTerm, message, replicaException, - ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)); + @Override + public void onFailure(Exception replicaException) { + logger.trace(() -> new ParameterizedMessage( + "[{}] failure while performing [{}] on replica {}, request [{}]", + shard.shardId(), opType, shard, replicaRequest), replicaException); + // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report. + if (TransportActions.isShardNotAvailableException(replicaException) == false) { + RestStatus restStatus = ExceptionsHelper.status(replicaException); + shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure( + shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); } + String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); + replicasProxy.failShardIfNeeded(shard, primaryTerm, message, replicaException, + ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary)); + } - @Override - public String toString() { - return "[" + replicaRequest + "][" + shard + "]"; - } - }); + @Override + public String toString() { + return "[" + replicaRequest + "][" + shard + "]"; + } + }; + + final String allocationId = shard.allocationId().getId(); + final RetryableAction replicationAction = new RetryableAction(logger, threadPool, + initialRetryBackoffBound, retryTimeout, replicationListener) { + + @Override + public void tryAction(ActionListener listener) { + replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener); + } + + @Override + public void onFinished() { + super.onFinished(); + pendingReplicationActions.removeReplicationAction(allocationId, this); + } + + @Override + public boolean shouldRetry(Exception e) { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + return cause instanceof CircuitBreakingException || + cause instanceof EsRejectedExecutionException || + cause instanceof ConnectTransportException; + } + }; + + pendingReplicationActions.addPendingAction(allocationId, replicationAction); + replicationAction.run(); } private void updateCheckPoints(ShardRouting shard, LongSupplier localCheckpointSupplier, LongSupplier globalCheckpointSupplier) { @@ -396,6 +437,13 @@ public class ReplicationOperation< * @return the replication group */ ReplicationGroup getReplicationGroup(); + + /** + * Returns the pending replication actions on the primary shard + * + * @return the pending replication actions + */ + PendingReplicationActions getPendingReplicationActions(); } /** diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 039b002b1ca..6a4ab67bdae 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -51,6 +51,8 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -96,6 +98,21 @@ public abstract class TransportReplicationAction< Response extends ReplicationResponse > extends TransportAction { + /** + * The timeout for retrying replication requests. + */ + public static final Setting REPLICATION_RETRY_TIMEOUT = + Setting.timeSetting("indices.replication.retry_timeout", TimeValue.timeValueSeconds(0), Setting.Property.Dynamic, + Setting.Property.NodeScope); + + /** + * The maximum bound for the first retry backoff for failed replication operations. The backoff bound + * will increase exponential if failures continue. + */ + public static final Setting REPLICATION_INITIAL_RETRY_BACKOFF_BOUND = + Setting.timeSetting("indices.replication.initial_retry_backoff_bound", TimeValue.timeValueMillis(50), TimeValue.timeValueMillis(10), + Setting.Property.Dynamic, Setting.Property.NodeScope); + protected final ThreadPool threadPool; protected final TransportService transportService; protected final ClusterService clusterService; @@ -109,6 +126,8 @@ public abstract class TransportReplicationAction< protected final String transportPrimaryAction; private final boolean syncGlobalCheckpointAfterOperation; + private volatile TimeValue initialRetryBackoffBound; + private volatile TimeValue retryTimeout; protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, @@ -137,6 +156,9 @@ public abstract class TransportReplicationAction< this.transportPrimaryAction = actionName + "[p]"; this.transportReplicaAction = actionName + "[r]"; + this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings); + this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings); + transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest); transportService.registerRequestHandler(transportPrimaryAction, executor, forceExecutionOnPrimary, true, @@ -149,6 +171,10 @@ public abstract class TransportReplicationAction< this.transportOptions = transportOptions(settings); this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation; + + ClusterSettings clusterSettings = clusterService.getClusterSettings(); + clusterSettings.addSettingsUpdateConsumer(REPLICATION_INITIAL_RETRY_BACKOFF_BOUND, (v) -> initialRetryBackoffBound = v); + clusterSettings.addSettingsUpdateConsumer(REPLICATION_RETRY_TIMEOUT, (v) -> retryTimeout = v); } @Override @@ -371,7 +397,9 @@ public abstract class TransportReplicationAction< new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference, ActionListener.map(responseListener, result -> result.finalResponseIfSuccessful), - newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute(); + newReplicasProxy(), logger, threadPool, actionName, primaryRequest.getPrimaryTerm(), initialRetryBackoffBound, + retryTimeout) + .execute(); } } catch (Exception e) { handleException(primaryShardReference, e); @@ -396,10 +424,6 @@ public abstract class TransportReplicationAction< } - protected ActionListener wrapResponseActionListener(ActionListener listener, IndexShard shard) { - return listener; - } - public static class PrimaryResult, Response extends ReplicationResponse> implements ReplicationOperation.PrimaryResult { @@ -929,6 +953,11 @@ public abstract class TransportReplicationAction< public ReplicationGroup getReplicationGroup() { return indexShard.getReplicationGroup(); } + + @Override + public PendingReplicationActions getPendingReplicationActions() { + return indexShard.getPendingReplicationActions(); + } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 4b0c164bf20..5b910cf706f 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.bootstrap.BootstrapSettings; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; @@ -347,6 +348,8 @@ public final class ClusterSettings extends AbstractScopedSettings { NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING, HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING, HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, + TransportReplicationAction.REPLICATION_INITIAL_RETRY_BACKOFF_BOUND, + TransportReplicationAction.REPLICATION_RETRY_TIMEOUT, TransportSettings.HOST, TransportSettings.PUBLISH_HOST, TransportSettings.PUBLISH_HOST_PROFILE, diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 2e0604f2198..282c8f74bcf 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -55,6 +55,7 @@ import java.util.Objects; import java.util.OptionalLong; import java.util.Set; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.LongSupplier; @@ -223,6 +224,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ private final double fileBasedRecoveryThreshold; + private final Consumer onReplicationGroupUpdated; + /** * Get all retention leases tracked on this shard. * @@ -870,16 +873,31 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L return value.isPresent() ? value.getAsLong() : SequenceNumbers.UNASSIGNED_SEQ_NO; } + public ReplicationTracker( + final ShardId shardId, + final String allocationId, + final IndexSettings indexSettings, + final long operationPrimaryTerm, + final long globalCheckpoint, + final LongConsumer onGlobalCheckpointUpdated, + final LongSupplier currentTimeMillisSupplier, + final BiConsumer> onSyncRetentionLeases, + final Supplier safeCommitInfoSupplier) { + this(shardId, allocationId, indexSettings, operationPrimaryTerm, globalCheckpoint, onGlobalCheckpointUpdated, + currentTimeMillisSupplier, onSyncRetentionLeases, safeCommitInfoSupplier, x -> {}); + } + /** * Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or * {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. * - * @param shardId the shard ID - * @param allocationId the allocation ID - * @param indexSettings the index settings - * @param operationPrimaryTerm the current primary term - * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} - * @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires + * @param shardId the shard ID + * @param allocationId the allocation ID + * @param indexSettings the index settings + * @param operationPrimaryTerm the current primary term + * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} + * @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires + * @param onReplicationGroupUpdated a callback when the replica group changes */ public ReplicationTracker( final ShardId shardId, @@ -890,7 +908,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, final BiConsumer> onSyncRetentionLeases, - final Supplier safeCommitInfoSupplier) { + final Supplier safeCommitInfoSupplier, + final Consumer onReplicationGroupUpdated) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; @@ -912,6 +931,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L indexSettings.getIndexMetadata().getState() == IndexMetadata.State.OPEN); this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); this.safeCommitInfoSupplier = safeCommitInfoSupplier; + this.onReplicationGroupUpdated = onReplicationGroupUpdated; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; assert invariant(); } @@ -926,10 +946,24 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L return replicationGroup; } + private void updateReplicationGroupAndNotify() { + assert Thread.holdsLock(this); + ReplicationGroup newReplicationGroup = calculateReplicationGroup(); + replicationGroup = newReplicationGroup; + onReplicationGroupUpdated.accept(newReplicationGroup); + } + private ReplicationGroup calculateReplicationGroup() { + long newVersion; + if (replicationGroup == null) { + newVersion = 0; + } else { + newVersion = replicationGroup.getVersion() + 1; + } return new ReplicationGroup(routingTable, checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()), - checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).map(Map.Entry::getKey).collect(Collectors.toSet())); + checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).map(Map.Entry::getKey).collect(Collectors.toSet()), + newVersion); } /** @@ -1098,7 +1132,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L } appliedClusterStateVersion = applyingClusterStateVersion; this.routingTable = routingTable; - replicationGroup = calculateReplicationGroup(); + updateReplicationGroupAndNotify(); if (primaryMode && removedEntries) { updateGlobalCheckpointOnPrimary(); // notify any waiter for local checkpoint advancement to recheck that their shard is still being tracked. @@ -1124,7 +1158,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L throw new IllegalStateException("no local checkpoint tracking information available"); } cps.tracked = true; - replicationGroup = calculateReplicationGroup(); + updateReplicationGroupAndNotify(); assert invariant(); } @@ -1169,7 +1203,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L } } else { cps.inSync = true; - replicationGroup = calculateReplicationGroup(); + updateReplicationGroupAndNotify(); logger.trace("marked [{}] as in-sync", allocationId); updateGlobalCheckpointOnPrimary(); } @@ -1214,7 +1248,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L pendingInSync.remove(allocationId); pending = false; cps.inSync = true; - replicationGroup = calculateReplicationGroup(); + updateReplicationGroupAndNotify(); logger.trace("marked [{}] as in-sync", allocationId); notifyAllWaiters(); } @@ -1342,7 +1376,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L checkpoints.put(entry.getKey(), entry.getValue().copy()); } routingTable = primaryContext.getRoutingTable(); - replicationGroup = calculateReplicationGroup(); + updateReplicationGroupAndNotify(); updateGlobalCheckpointOnPrimary(); // reapply missed cluster state update // note that if there was no cluster state update between start of the engine of this shard and the call to diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 84c8b77a677..490fa203496 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -46,6 +46,7 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; +import org.elasticsearch.action.support.replication.PendingReplicationActions; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; @@ -213,6 +214,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final SearchOperationListener searchOperationListener; private final GlobalCheckpointListeners globalCheckpointListeners; + private final PendingReplicationActions pendingReplicationActions; private final ReplicationTracker replicationTracker; protected volatile ShardRouting shardRouting; @@ -338,6 +340,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl this.pendingPrimaryTerm = primaryTerm; this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.scheduler(), logger); + this.pendingReplicationActions = new PendingReplicationActions(shardId, threadPool); this.replicationTracker = new ReplicationTracker( shardId, aId, @@ -347,7 +350,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl globalCheckpointListeners::globalCheckpointUpdated, threadPool::absoluteTimeInMillis, (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, aId, getPendingPrimaryTerm(), retentionLeases, listener), - this::getSafeCommitInfo); + this::getSafeCommitInfo, + pendingReplicationActions); // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis @@ -1350,7 +1354,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times // Also closing refreshListeners to prevent us from accumulating any more listeners - IOUtils.close(engine, globalCheckpointListeners, refreshListeners); + IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions); indexShardOperationPermits.close(); } } @@ -2386,7 +2390,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public ReplicationGroup getReplicationGroup() { assert assertPrimaryMode(); verifyNotClosed(); - return replicationTracker.getReplicationGroup(); + ReplicationGroup replicationGroup = replicationTracker.getReplicationGroup(); + // PendingReplicationActions is dependent on ReplicationGroup. Every time we expose ReplicationGroup, + // ensure PendingReplicationActions is updated with the newest version to prevent races. + pendingReplicationActions.accept(replicationGroup); + return replicationGroup; + } + + /** + * Returns the pending replication actions for the shard. + * + * @return the pending replication actions + */ + public PendingReplicationActions getPendingReplicationActions() { + assert assertPrimaryMode(); + verifyNotClosed(); + return pendingReplicationActions; } /** diff --git a/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java b/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java index 4f63d746f51..16cb01662a3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java +++ b/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java @@ -34,15 +34,18 @@ public class ReplicationGroup { private final IndexShardRoutingTable routingTable; private final Set inSyncAllocationIds; private final Set trackedAllocationIds; + private final long version; private final Set unavailableInSyncShards; // derived from the other fields private final List replicationTargets; // derived from the other fields private final List skippedShards; // derived from the other fields - public ReplicationGroup(IndexShardRoutingTable routingTable, Set inSyncAllocationIds, Set trackedAllocationIds) { + public ReplicationGroup(IndexShardRoutingTable routingTable, Set inSyncAllocationIds, Set trackedAllocationIds, + long version) { this.routingTable = routingTable; this.inSyncAllocationIds = inSyncAllocationIds; this.trackedAllocationIds = trackedAllocationIds; + this.version = version; this.unavailableInSyncShards = Sets.difference(inSyncAllocationIds, routingTable.getAllAllocationIds()); this.replicationTargets = new ArrayList<>(); @@ -73,6 +76,10 @@ public class ReplicationGroup { } } + public long getVersion() { + return version; + } + public IndexShardRoutingTable getRoutingTable() { return routingTable; } @@ -81,6 +88,10 @@ public class ReplicationGroup { return inSyncAllocationIds; } + public Set getTrackedAllocationIds() { + return trackedAllocationIds; + } + /** * Returns the set of shard allocation ids that are in the in-sync set but have no assigned routing entry */ diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 858568649f6..9bbf9028acc 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.replication.PendingReplicationActions; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; @@ -40,6 +41,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; @@ -229,7 +231,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase { unavailableShards.forEach(shardRoutingTableBuilder::removeShard); shardRoutingTable = shardRoutingTableBuilder.build(); - final ReplicationGroup replicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards); + final ReplicationGroup replicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0); assertThat(replicationGroup.getUnavailableInSyncShards().size(), greaterThan(0)); final PlainActionFuture listener = new PlainActionFuture<>(); @@ -239,7 +241,8 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase { ReplicationOperation.Replicas proxy = action.newReplicasProxy(); ReplicationOperation operation = new ReplicationOperation<>( - request, createPrimary(primaryRouting, replicationGroup), listener, proxy, logger, "test", primaryTerm); + request, createPrimary(primaryRouting, replicationGroup), listener, proxy, logger, threadPool, "test", primaryTerm, + TimeValue.timeValueMillis(20), TimeValue.timeValueSeconds(60)); operation.execute(); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); @@ -276,13 +279,23 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase { TransportVerifyShardBeforeCloseAction.ShardRequest, PrimaryResult> createPrimary(final ShardRouting primary, final ReplicationGroup replicationGroup) { - return new ReplicationOperation.Primary() { + final PendingReplicationActions replicationActions = new PendingReplicationActions(primary.shardId(), threadPool); + replicationActions.accept(replicationGroup); + return new ReplicationOperation.Primary< + TransportVerifyShardBeforeCloseAction.ShardRequest, + TransportVerifyShardBeforeCloseAction.ShardRequest, + PrimaryResult>() { + @Override public ShardRouting routingEntry() { return primary; } + @Override + public PendingReplicationActions getPendingReplicationActions() { + return replicationActions; + } + @Override public ReplicationGroup getReplicationGroup() { return replicationGroup; diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index 2bd37b99763..7210a35be39 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -134,7 +134,7 @@ public class TransportResyncReplicationActionTests extends ESTestCase { when(indexShard.getReplicationGroup()).thenReturn( new ReplicationGroup(shardRoutingTable, clusterService.state().metadata().index(index).inSyncAllocationIds(shardId.id()), - shardRoutingTable.getAllAllocationIds())); + shardRoutingTable.getAllAllocationIds(), 0)); final IndexService indexService = mock(IndexService.class); when(indexService.getShard(eq(shardId.id()))).thenReturn(indexShard); diff --git a/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java b/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java index ff64efa83cc..34608d5183c 100644 --- a/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java @@ -149,6 +149,30 @@ public class RetryableActionTests extends ESTestCase { expectThrows(EsRejectedExecutionException.class, future::actionGet); } + public void testTimeoutOfZeroMeansNoRetry() { + final AtomicInteger executedCount = new AtomicInteger(); + final PlainActionFuture future = PlainActionFuture.newFuture(); + final RetryableAction retryableAction = new RetryableAction(logger, taskQueue.getThreadPool(), + TimeValue.timeValueMillis(10), TimeValue.timeValueSeconds(0), future) { + + @Override + public void tryAction(ActionListener listener) { + executedCount.getAndIncrement(); + throw new EsRejectedExecutionException(); + } + + @Override + public boolean shouldRetry(Exception e) { + return e instanceof EsRejectedExecutionException; + } + }; + retryableAction.run(); + taskQueue.runAllRunnableTasks(); + + assertEquals(1, executedCount.get()); + expectThrows(EsRejectedExecutionException.class, future::actionGet); + } + public void testFailedBecauseNotRetryable() { final AtomicInteger executedCount = new AtomicInteger(); final PlainActionFuture future = PlainActionFuture.newFuture(); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/PendingReplicationActionsTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/PendingReplicationActionsTests.java new file mode 100644 index 00000000000..cccfdfe28ec --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/support/replication/PendingReplicationActionsTests.java @@ -0,0 +1,123 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support.replication; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.RetryableAction; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.IndexShardClosedException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; + +import java.util.Collections; + +public class PendingReplicationActionsTests extends ESTestCase { + + private TestThreadPool threadPool; + private ShardId shardId; + private PendingReplicationActions pendingReplication; + + @Override + public void setUp() throws Exception { + super.setUp(); + shardId = new ShardId("index", UUIDs.randomBase64UUID(), 0); + threadPool = new TestThreadPool(getTestName()); + pendingReplication = new PendingReplicationActions(shardId, threadPool); + } + + @Override + public void tearDown() throws Exception { + terminate(threadPool); + super.tearDown(); + } + + public void testAllocationIdActionCanBeRun() { + String allocationId = UUIDs.randomBase64UUID(); + PlainActionFuture future = PlainActionFuture.newFuture(); + pendingReplication.acceptNewTrackedAllocationIds(Collections.singleton(allocationId)); + TestAction action = new TestAction(future); + pendingReplication.addPendingAction(allocationId, action); + action.run(); + future.actionGet(); + assertTrue(future.isDone()); + } + + public void testMissingAllocationIdActionWillBeCancelled() { + String allocationId = UUIDs.randomBase64UUID(); + PlainActionFuture future = PlainActionFuture.newFuture(); + TestAction action = new TestAction(future); + pendingReplication.addPendingAction(allocationId, action); + expectThrows(IndexShardClosedException.class, future::actionGet); + } + + public void testAllocationIdActionWillBeCancelledIfTrackedAllocationChanges() { + String allocationId = UUIDs.randomBase64UUID(); + PlainActionFuture future = PlainActionFuture.newFuture(); + pendingReplication.acceptNewTrackedAllocationIds(Collections.singleton(allocationId)); + TestAction action = new TestAction(future, false); + pendingReplication.addPendingAction(allocationId, action); + action.run(); + pendingReplication.acceptNewTrackedAllocationIds(Collections.emptySet()); + expectThrows(IndexShardClosedException.class, future::actionGet); + } + + public void testAllocationIdActionWillBeCancelledOnClose() { + String allocationId = UUIDs.randomBase64UUID(); + PlainActionFuture future = PlainActionFuture.newFuture(); + pendingReplication.acceptNewTrackedAllocationIds(Collections.singleton(allocationId)); + TestAction action = new TestAction(future, false); + pendingReplication.addPendingAction(allocationId, action); + action.run(); + pendingReplication.close(); + expectThrows(IndexShardClosedException.class, future::actionGet); + } + + private class TestAction extends RetryableAction { + + private final boolean succeed; + private final Exception retryable = new Exception(); + + private TestAction(ActionListener listener) { + this(listener, true); + } + + private TestAction(ActionListener listener, boolean succeed) { + super(logger, threadPool, TimeValue.timeValueMillis(1), TimeValue.timeValueMinutes(1), listener); + this.succeed = succeed; + } + + @Override + public void tryAction(ActionListener listener) { + if (succeed) { + listener.onResponse(null); + } else { + listener.onFailure(retryable); + } + } + + @Override + public boolean shouldRetry(Exception e) { + return retryable == e; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 6830a8613f9..45778056337 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -35,7 +35,12 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.shard.IndexShardNotStartedException; import org.elasticsearch.index.shard.IndexShardState; @@ -43,8 +48,13 @@ import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.SendRequestTransportException; +import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -54,6 +64,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -67,6 +78,20 @@ import static org.hamcrest.Matchers.nullValue; public class ReplicationOperationTests extends ESTestCase { + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getTestName()); + } + + @Override + public void tearDown() throws Exception { + terminate(threadPool); + super.tearDown(); + } + public void testReplication() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); @@ -92,7 +117,7 @@ public class ReplicationOperationTests extends ESTestCase { addTrackingInfo(indexShardRoutingTable, primaryShard, trackedShards, untrackedShards); trackedShards.addAll(staleAllocationIds); - final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards); + final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards, 0); final Set expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards); @@ -117,7 +142,7 @@ public class ReplicationOperationTests extends ESTestCase { PlainActionFuture listener = new PlainActionFuture<>(); final TestReplicaProxy replicasProxy = new TestReplicaProxy(simulatedFailures); - final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup); + final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup, threadPool); final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, primaryTerm); op.execute(); assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); @@ -141,6 +166,81 @@ public class ReplicationOperationTests extends ESTestCase { assertThat(primary.knownGlobalCheckpoints, equalTo(replicasProxy.generatedGlobalCheckpoints)); } + public void testRetryTransientReplicationFailure() throws Exception { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + + ClusterState initialState = stateWithActivePrimary(index, true, randomInt(5)); + IndexMetadata indexMetadata = initialState.getMetadata().index(index); + final long primaryTerm = indexMetadata.primaryTerm(0); + final IndexShardRoutingTable indexShardRoutingTable = initialState.getRoutingTable().shardRoutingTable(shardId); + ShardRouting primaryShard = indexShardRoutingTable.primaryShard(); + if (primaryShard.relocating() && randomBoolean()) { + // simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated + initialState = ClusterState.builder(initialState) + .nodes(DiscoveryNodes.builder(initialState.nodes()).localNodeId(primaryShard.relocatingNodeId())).build(); + primaryShard = primaryShard.getTargetRelocatingShard(); + } + // add a few in-sync allocation ids that don't have corresponding routing entries + final Set staleAllocationIds = Sets.newHashSet(generateRandomStringArray(4, 10, false)); + + final Set inSyncAllocationIds = Sets.union(indexMetadata.inSyncAllocationIds(0), staleAllocationIds); + + final Set trackedShards = new HashSet<>(); + final Set untrackedShards = new HashSet<>(); + addTrackingInfo(indexShardRoutingTable, primaryShard, trackedShards, untrackedShards); + trackedShards.addAll(staleAllocationIds); + + final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards, 0); + + final Set expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards); + + final Map simulatedFailures = new HashMap<>(); + for (ShardRouting replica : expectedReplicas) { + Exception cause; + Exception exception; + if (randomBoolean()) { + if (randomBoolean()) { + cause = new CircuitBreakingException("broken", CircuitBreaker.Durability.PERMANENT); + } else { + cause = new EsRejectedExecutionException("rejected"); + } + exception = new RemoteTransportException("remote", cause); + } else { + TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 9300); + DiscoveryNode node = new DiscoveryNode("replica", address, Version.CURRENT); + cause = new ConnectTransportException(node, "broken"); + exception = cause; + } + logger.debug("--> simulating failure on {} with [{}]", replica, exception.getClass().getSimpleName()); + simulatedFailures.put(replica, exception); + } + + Request request = new Request(shardId); + PlainActionFuture listener = new PlainActionFuture<>(); + final TestReplicaProxy replicasProxy = new TestReplicaProxy(simulatedFailures, true); + + final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup, threadPool); + final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, primaryTerm, + TimeValue.timeValueMillis(20), TimeValue.timeValueSeconds(60)); + op.execute(); + assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); + assertThat(request.processedOnReplicas, equalTo(expectedReplicas)); + assertThat(replicasProxy.failedReplicas.size(), equalTo(0)); + assertThat(replicasProxy.markedAsStaleCopies, equalTo(staleAllocationIds)); + assertThat("post replication operations not run on primary", request.runPostReplicationActionsOnPrimary.get(), equalTo(true)); + ShardInfo shardInfo = listener.actionGet().getShardInfo(); + assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size())); + final List unassignedShards = indexShardRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED); + final int totalShards = 1 + expectedReplicas.size() + unassignedShards.size() + untrackedShards.size(); + assertThat(replicationGroup.toString(), shardInfo.getTotal(), equalTo(totalShards)); + + assertThat(primary.knownLocalCheckpoints.remove(primaryShard.allocationId().getId()), equalTo(primary.localCheckpoint)); + assertThat(primary.knownLocalCheckpoints, equalTo(replicasProxy.generatedLocalCheckpoints)); + assertThat(primary.knownGlobalCheckpoints.remove(primaryShard.allocationId().getId()), equalTo(primary.globalCheckpoint)); + assertThat(primary.knownGlobalCheckpoints, equalTo(replicasProxy.generatedGlobalCheckpoints)); + } + private void addTrackingInfo(IndexShardRoutingTable indexShardRoutingTable, ShardRouting primaryShard, Set trackedShards, Set untrackedShards) { for (ShardRouting shr : indexShardRoutingTable.shards()) { @@ -187,7 +287,7 @@ public class ReplicationOperationTests extends ESTestCase { addTrackingInfo(indexShardRoutingTable, primaryShard, trackedShards, new HashSet<>()); trackedShards.addAll(staleAllocationIds); - final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards); + final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards, 0); final Set expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards); @@ -234,7 +334,7 @@ public class ReplicationOperationTests extends ESTestCase { } }; AtomicBoolean primaryFailed = new AtomicBoolean(); - final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup) { + final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup, threadPool) { @Override public void failShard(String message, Exception exception) { assertThat(exception, instanceOf(ShardStateAction.NoLongerPrimaryShardException.class)); @@ -263,7 +363,7 @@ public class ReplicationOperationTests extends ESTestCase { IndexShardRoutingTable shardRoutingTable = initialState.getRoutingTable().shardRoutingTable(shardId); Set trackedShards = new HashSet<>(); addTrackingInfo(shardRoutingTable, null, trackedShards, new HashSet<>()); - ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards); + ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0); final ClusterState stateWithAddedReplicas; if (randomBoolean()) { @@ -278,13 +378,13 @@ public class ReplicationOperationTests extends ESTestCase { trackedShards = new HashSet<>(); addTrackingInfo(shardRoutingTable, null, trackedShards, new HashSet<>()); - ReplicationGroup updatedReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards); + ReplicationGroup updatedReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0); final AtomicReference replicationGroup = new AtomicReference<>(initialReplicationGroup); logger.debug("--> using initial replicationGroup:\n{}", replicationGroup.get()); final long primaryTerm = initialState.getMetadata().index(shardId.getIndexName()).primaryTerm(shardId.id()); final ShardRouting primaryShard = updatedReplicationGroup.getRoutingTable().primaryShard(); - final TestPrimary primary = new TestPrimary(primaryShard, replicationGroup::get) { + final TestPrimary primary = new TestPrimary(primaryShard, replicationGroup::get, threadPool) { @Override public void perform(Request request, ActionListener listener) { super.perform(request, ActionListener.map(listener, result -> { @@ -336,13 +436,13 @@ public class ReplicationOperationTests extends ESTestCase { final Set inSyncAllocationIds = state.metadata().index(index).inSyncAllocationIds(0); Set trackedShards = new HashSet<>(); addTrackingInfo(shardRoutingTable, null, trackedShards, new HashSet<>()); - final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards); + final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0); PlainActionFuture listener = new PlainActionFuture<>(); final ShardRouting primaryShard = shardRoutingTable.primaryShard(); final TestReplicationOperation op = new TestReplicationOperation(request, - new TestPrimary(primaryShard, () -> initialReplicationGroup), - listener, new TestReplicaProxy(), logger, "test", primaryTerm); + new TestPrimary(primaryShard, () -> initialReplicationGroup, threadPool), + listener, new TestReplicaProxy(), logger, threadPool, "test", primaryTerm); if (passesActiveShardCheck) { assertThat(op.checkActiveShardCount(), nullValue()); @@ -372,12 +472,12 @@ public class ReplicationOperationTests extends ESTestCase { final Set inSyncAllocationIds = indexMetadata.inSyncAllocationIds(0); final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(index).shard(shardId.id()); final Set trackedShards = shardRoutingTable.getAllAllocationIds(); - final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards); + final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0); final boolean fatal = randomBoolean(); final AtomicBoolean primaryFailed = new AtomicBoolean(); final ReplicationOperation.Primary primary = - new TestPrimary(primaryRouting, () -> initialReplicationGroup) { + new TestPrimary(primaryRouting, () -> initialReplicationGroup, threadPool) { @Override public void failShard(String message, Exception exception) { @@ -460,15 +560,17 @@ public class ReplicationOperationTests extends ESTestCase { final long globalCheckpoint; final long maxSeqNoOfUpdatesOrDeletes; final Supplier replicationGroupSupplier; + final PendingReplicationActions pendingReplicationActions; final Map knownLocalCheckpoints = new HashMap<>(); final Map knownGlobalCheckpoints = new HashMap<>(); - TestPrimary(ShardRouting routing, Supplier replicationGroupSupplier) { + TestPrimary(ShardRouting routing, Supplier replicationGroupSupplier, ThreadPool threadPool) { this.routing = routing; this.replicationGroupSupplier = replicationGroupSupplier; this.localCheckpoint = random().nextLong(); this.globalCheckpoint = randomNonNegativeLong(); this.maxSeqNoOfUpdatesOrDeletes = randomNonNegativeLong(); + this.pendingReplicationActions = new PendingReplicationActions(routing.shardId(), threadPool); } @Override @@ -555,6 +657,11 @@ public class ReplicationOperationTests extends ESTestCase { return replicationGroupSupplier.get(); } + @Override + public PendingReplicationActions getPendingReplicationActions() { + pendingReplicationActions.accept(getReplicationGroup()); + return pendingReplicationActions; + } } static class ReplicaResponse implements ReplicationOperation.ReplicaResponse { @@ -580,7 +687,10 @@ public class ReplicationOperationTests extends ESTestCase { static class TestReplicaProxy implements ReplicationOperation.Replicas { + private final int attemptsBeforeSuccess; + private final AtomicInteger attemptsNumber = new AtomicInteger(0); final Map opFailures; + private final boolean retryable; final Set failedReplicas = ConcurrentCollections.newConcurrentSet(); @@ -595,7 +705,17 @@ public class ReplicationOperationTests extends ESTestCase { } TestReplicaProxy(Map opFailures) { + this(opFailures, false); + } + + TestReplicaProxy(Map opFailures, boolean retryable) { this.opFailures = opFailures; + this.retryable = retryable; + if (retryable) { + attemptsBeforeSuccess = randomInt(2) + 1; + } else { + attemptsBeforeSuccess = Integer.MAX_VALUE; + } } @Override @@ -606,9 +726,19 @@ public class ReplicationOperationTests extends ESTestCase { final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, final ActionListener listener) { - assertTrue("replica request processed twice on [" + replica + "]", request.processedOnReplicas.add(replica)); - assertFalse("primary post replication actions should run after replication", request.runPostReplicationActionsOnPrimary.get()); - if (opFailures.containsKey(replica)) { + boolean added = request.processedOnReplicas.add(replica); + if (retryable == false) { + assertTrue("replica request processed twice on [" + replica + "]", added); + } + // If replication is not retryable OR this is the first attempt, the post replication actions + // should not have run. + if (retryable == false || added) { + assertFalse("primary post replication actions should run after replication", + request.runPostReplicationActionsOnPrimary.get()); + } + // If this is a retryable scenario and this is the second try, we finish successfully + int n = attemptsNumber.incrementAndGet(); + if (opFailures.containsKey(replica) && n <= attemptsBeforeSuccess) { listener.onFailure(opFailures.get(replica)); } else { final long generatedLocalCheckpoint = random().nextLong(); @@ -643,15 +773,31 @@ public class ReplicationOperationTests extends ESTestCase { } class TestReplicationOperation extends ReplicationOperation { + TestReplicationOperation(Request request, Primary primary, - ActionListener listener, Replicas replicas, long primaryTerm) { - this(request, primary, listener, replicas, ReplicationOperationTests.this.logger, "test", primaryTerm); + ActionListener listener, Replicas replicas, long primaryTerm, + TimeValue initialRetryBackoffBound, TimeValue retryTimeout) { + this(request, primary, listener, replicas, ReplicationOperationTests.this.logger, threadPool, "test", primaryTerm, + initialRetryBackoffBound, retryTimeout); + } + + TestReplicationOperation(Request request, Primary primary, + ActionListener listener, Replicas replicas, long primaryTerm) { + this(request, primary, listener, replicas, ReplicationOperationTests.this.logger, threadPool, "test", primaryTerm); } TestReplicationOperation(Request request, Primary primary, ActionListener listener, - Replicas replicas, Logger logger, String opType, long primaryTerm) { - super(request, primary, listener, replicas, logger, opType, primaryTerm); + Replicas replicas, Logger logger, ThreadPool threadPool, String opType, long primaryTerm) { + this(request, primary, listener, replicas, logger, threadPool, opType, primaryTerm, TimeValue.timeValueMillis(50), + TimeValue.timeValueSeconds(1)); + } + + TestReplicationOperation(Request request, Primary primary, + ActionListener listener, + Replicas replicas, Logger logger, ThreadPool threadPool, String opType, long primaryTerm, + TimeValue initialRetryBackoffBound, TimeValue retryTimeout) { + super(request, primary, listener, replicas, logger, threadPool, opType, primaryTerm, initialRetryBackoffBound, retryTimeout); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 382b7a39774..fd6cacf7739 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -828,10 +828,12 @@ public class TransportReplicationActionTests extends ESTestCase { IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId); Set inSyncIds = randomBoolean() ? singleton(routingEntry.allocationId().getId()) : clusterService.state().metadata().index(index).inSyncAllocationIds(0); + ReplicationGroup replicationGroup = new ReplicationGroup(shardRoutingTable, inSyncIds, shardRoutingTable.getAllAllocationIds(), 0); when(shard.getReplicationGroup()).thenReturn( - new ReplicationGroup(shardRoutingTable, - inSyncIds, - shardRoutingTable.getAllAllocationIds())); + replicationGroup); + PendingReplicationActions replicationActions = new PendingReplicationActions(shardId, threadPool); + replicationActions.accept(replicationGroup); + when(shard.getPendingReplicationActions()).thenReturn(replicationActions); doAnswer(invocation -> { count.incrementAndGet(); //noinspection unchecked diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 3b9aa25bc0c..42a696c3b25 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -359,7 +359,7 @@ public class TransportWriteActionTests extends ESTestCase { protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) { super(Settings.EMPTY, "internal:test", new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, - x -> null, null, Collections.emptySet()), null, null, null, null, + x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null, new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false); this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 0bbe83a25fe..cb33caf8046 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -39,6 +39,7 @@ import org.elasticsearch.action.resync.TransportResyncReplicationAction; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.replication.PendingReplicationActions; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; @@ -63,6 +64,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; @@ -608,7 +610,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase adaptResponse(result.finalResponse, getPrimaryShard()); return result.finalResponse; }), - new ReplicasRef(), logger, opType, primaryTerm) + new ReplicasRef(), logger, threadPool, opType, primaryTerm, TimeValue.timeValueMillis(20), + TimeValue.timeValueSeconds(60)) .execute(); } catch (Exception e) { listener.onFailure(e); @@ -680,6 +683,10 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase return getPrimaryShard().getReplicationGroup(); } + @Override + public PendingReplicationActions getPendingReplicationActions() { + return getPrimaryShard().getPendingReplicationActions(); + } } class ReplicasRef implements ReplicationOperation.Replicas { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index e8a26a05e23..ac1a330e6cc 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -37,6 +37,7 @@ import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExc import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; +import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.ClusterName; @@ -525,6 +526,12 @@ public final class InternalTestCluster extends TestCluster { builder.put(ScriptService.SCRIPT_GENERAL_CACHE_EXPIRE_SETTING.getKey(), timeValueMillis(RandomNumbers.randomIntBetween(random, 750, 10000000)).getStringRep()); } + if (random.nextBoolean()) { + int initialMillisBound = RandomNumbers.randomIntBetween(random,10, 100); + builder.put(TransportReplicationAction.REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.getKey(), timeValueMillis(initialMillisBound)); + int retryTimeoutSeconds = RandomNumbers.randomIntBetween(random, 0, 60); + builder.put(TransportReplicationAction.REPLICATION_RETRY_TIMEOUT.getKey(), timeValueSeconds(retryTimeoutSeconds)); + } return builder.build(); }