Retry failed replication due to transient errors (#56230)

Currently a failed replication action will fail an entire replica. This
includes when replication fails due to potentially short lived transient
issues such as network distruptions or circuit breaking errors.

This commit implements retries using the retryable action.
This commit is contained in:
Tim Brooks 2020-06-17 10:17:30 -06:00 committed by GitHub
parent 5ddea03de7
commit 2074412d79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 671 additions and 89 deletions

View File

@ -508,9 +508,11 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
final ShardId shardId = new ShardId(clusterService().state().metadata().index("test").getIndex(), 0);
final Set<String> replicaNodes = new HashSet<>(internalCluster().startDataOnlyNodes(numberOfReplicas));
ensureGreen();
String timeout = randomFrom("0s", "1s", "2s");
assertAcked(
client(master).admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")).get());
.setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none"))
.setPersistentSettings(Settings.builder().put("indices.replication.retry_timeout", timeout)).get());
logger.info("--> Indexing with gap in seqno to ensure that some operations will be replayed in resync");
long numDocs = scaledRandomIntBetween(5, 50);
for (int i = 0; i < numDocs; i++) {

View File

@ -65,7 +65,7 @@ public abstract class RetryableAction<Response> {
if (initialDelayMillis < 1) {
throw new IllegalArgumentException("Initial delay was less than 1 millisecond: " + initialDelay);
}
this.timeoutMillis = Math.max(timeoutValue.getMillis(), 1);
this.timeoutMillis = timeoutValue.getMillis();
this.startMillis = threadPool.relativeTimeInMillis();
this.finalListener = listener;
this.executor = executor;

View File

@ -0,0 +1,114 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.elasticsearch.action.support.RetryableAction;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
public class PendingReplicationActions implements Consumer<ReplicationGroup>, Releasable {
private final Map<String, Set<RetryableAction<?>>> onGoingReplicationActions = ConcurrentCollections.newConcurrentMap();
private final ShardId shardId;
private final ThreadPool threadPool;
private volatile long replicationGroupVersion = -1;
public PendingReplicationActions(ShardId shardId, ThreadPool threadPool) {
this.shardId = shardId;
this.threadPool = threadPool;
}
public void addPendingAction(String allocationId, RetryableAction<?> replicationAction) {
Set<RetryableAction<?>> ongoingActionsOnNode = onGoingReplicationActions.get(allocationId);
if (ongoingActionsOnNode != null) {
ongoingActionsOnNode.add(replicationAction);
if (onGoingReplicationActions.containsKey(allocationId) == false) {
replicationAction.cancel(new IndexShardClosedException(shardId,
"Replica unavailable - replica could have left ReplicationGroup or IndexShard might have closed"));
}
} else {
replicationAction.cancel(new IndexShardClosedException(shardId,
"Replica unavailable - replica could have left ReplicationGroup or IndexShard might have closed"));
}
}
public void removeReplicationAction(String allocationId, RetryableAction<?> action) {
Set<RetryableAction<?>> ongoingActionsOnNode = onGoingReplicationActions.get(allocationId);
if (ongoingActionsOnNode != null) {
ongoingActionsOnNode.remove(action);
}
}
@Override
public void accept(ReplicationGroup replicationGroup) {
if (isNewerVersion(replicationGroup)) {
synchronized (this) {
if (isNewerVersion(replicationGroup)) {
acceptNewTrackedAllocationIds(replicationGroup.getTrackedAllocationIds());
replicationGroupVersion = replicationGroup.getVersion();
}
}
}
}
private boolean isNewerVersion(ReplicationGroup replicationGroup) {
// Relative comparison to mitigate long overflow
return replicationGroup.getVersion() - replicationGroupVersion > 0;
}
// Visible for testing
synchronized void acceptNewTrackedAllocationIds(Set<String> trackedAllocationIds) {
for (String targetAllocationId : trackedAllocationIds) {
onGoingReplicationActions.putIfAbsent(targetAllocationId, ConcurrentCollections.newConcurrentSet());
}
ArrayList<Set<RetryableAction<?>>> toCancel = new ArrayList<>();
for (String allocationId : onGoingReplicationActions.keySet()) {
if (trackedAllocationIds.contains(allocationId) == false) {
toCancel.add(onGoingReplicationActions.remove(allocationId));
}
}
cancelActions(toCancel, "Replica left ReplicationGroup");
}
@Override
public synchronized void close() {
ArrayList<Set<RetryableAction<?>>> toCancel = new ArrayList<>(onGoingReplicationActions.values());
onGoingReplicationActions.clear();
cancelActions(toCancel, "Primary closed.");
}
private void cancelActions(ArrayList<Set<RetryableAction<?>>> toCancel, String message) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> toCancel.stream()
.flatMap(Collection::stream)
.forEach(action -> action.cancel(new IndexShardClosedException(shardId, message))));
}
}

View File

@ -27,17 +27,23 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.RetryableAction;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import java.io.IOException;
import java.util.ArrayList;
@ -54,6 +60,7 @@ public class ReplicationOperation<
PrimaryResultT extends ReplicationOperation.PrimaryResult<ReplicaRequest>
> {
private final Logger logger;
private final ThreadPool threadPool;
private final Request request;
private final String opType;
private final AtomicInteger totalShards = new AtomicInteger();
@ -72,6 +79,8 @@ public class ReplicationOperation<
private final Primary<Request, ReplicaRequest, PrimaryResultT> primary;
private final Replicas<ReplicaRequest> replicasProxy;
private final AtomicBoolean finished = new AtomicBoolean();
private final TimeValue initialRetryBackoffBound;
private final TimeValue retryTimeout;
private final long primaryTerm;
// exposed for tests
@ -84,14 +93,18 @@ public class ReplicationOperation<
public ReplicationOperation(Request request, Primary<Request, ReplicaRequest, PrimaryResultT> primary,
ActionListener<PrimaryResultT> listener,
Replicas<ReplicaRequest> replicas,
Logger logger, String opType, long primaryTerm) {
Logger logger, ThreadPool threadPool, String opType, long primaryTerm, TimeValue initialRetryBackoffBound,
TimeValue retryTimeout) {
this.replicasProxy = replicas;
this.primary = primary;
this.resultListener = listener;
this.logger = logger;
this.threadPool = threadPool;
this.request = request;
this.opType = opType;
this.primaryTerm = primaryTerm;
this.initialRetryBackoffBound = initialRetryBackoffBound;
this.retryTimeout = retryTimeout;
}
public void execute() throws Exception {
@ -130,8 +143,9 @@ public class ReplicationOperation<
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
final PendingReplicationActions pendingReplicationActions = primary.getPendingReplicationActions();
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup, pendingReplicationActions);
}
primaryResult.runPostReplicationActions(new ActionListener<Void>() {
@ -165,7 +179,8 @@ public class ReplicationOperation<
}
private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) {
final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup,
final PendingReplicationActions pendingReplicationActions) {
// for total stats, add number of unassigned shards and
// number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
totalShards.addAndGet(replicationGroup.getSkippedShards().size());
@ -174,52 +189,78 @@ public class ReplicationOperation<
for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
if (shard.isSameAllocation(primaryRouting) == false) {
performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, pendingReplicationActions);
}
}
}
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest,
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes,
final PendingReplicationActions pendingReplicationActions) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
}
totalShards.incrementAndGet();
pendingActions.incrementAndGet();
replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes,
new ActionListener<ReplicaResponse>() {
@Override
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint);
} finally {
decPendingAndFinishIfNeeded();
}
final ActionListener<ReplicaResponse> replicationListener = new ActionListener<ReplicaResponse>() {
@Override
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint);
} finally {
decPendingAndFinishIfNeeded();
}
}
@Override
public void onFailure(Exception replicaException) {
logger.trace(() -> new ParameterizedMessage(
"[{}] failure while performing [{}] on replica {}, request [{}]",
shard.shardId(), opType, shard, replicaRequest), replicaException);
// Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
if (TransportActions.isShardNotAvailableException(replicaException) == false) {
RestStatus restStatus = ExceptionsHelper.status(replicaException);
shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
}
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
replicasProxy.failShardIfNeeded(shard, primaryTerm, message, replicaException,
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
@Override
public void onFailure(Exception replicaException) {
logger.trace(() -> new ParameterizedMessage(
"[{}] failure while performing [{}] on replica {}, request [{}]",
shard.shardId(), opType, shard, replicaRequest), replicaException);
// Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
if (TransportActions.isShardNotAvailableException(replicaException) == false) {
RestStatus restStatus = ExceptionsHelper.status(replicaException);
shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
}
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
replicasProxy.failShardIfNeeded(shard, primaryTerm, message, replicaException,
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
}
@Override
public String toString() {
return "[" + replicaRequest + "][" + shard + "]";
}
});
@Override
public String toString() {
return "[" + replicaRequest + "][" + shard + "]";
}
};
final String allocationId = shard.allocationId().getId();
final RetryableAction<ReplicaResponse> replicationAction = new RetryableAction<ReplicaResponse>(logger, threadPool,
initialRetryBackoffBound, retryTimeout, replicationListener) {
@Override
public void tryAction(ActionListener<ReplicaResponse> listener) {
replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener);
}
@Override
public void onFinished() {
super.onFinished();
pendingReplicationActions.removeReplicationAction(allocationId, this);
}
@Override
public boolean shouldRetry(Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
return cause instanceof CircuitBreakingException ||
cause instanceof EsRejectedExecutionException ||
cause instanceof ConnectTransportException;
}
};
pendingReplicationActions.addPendingAction(allocationId, replicationAction);
replicationAction.run();
}
private void updateCheckPoints(ShardRouting shard, LongSupplier localCheckpointSupplier, LongSupplier globalCheckpointSupplier) {
@ -396,6 +437,13 @@ public class ReplicationOperation<
* @return the replication group
*/
ReplicationGroup getReplicationGroup();
/**
* Returns the pending replication actions on the primary shard
*
* @return the pending replication actions
*/
PendingReplicationActions getPendingReplicationActions();
}
/**

View File

@ -51,6 +51,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -96,6 +98,21 @@ public abstract class TransportReplicationAction<
Response extends ReplicationResponse
> extends TransportAction<Request, Response> {
/**
* The timeout for retrying replication requests.
*/
public static final Setting<TimeValue> REPLICATION_RETRY_TIMEOUT =
Setting.timeSetting("indices.replication.retry_timeout", TimeValue.timeValueSeconds(0), Setting.Property.Dynamic,
Setting.Property.NodeScope);
/**
* The maximum bound for the first retry backoff for failed replication operations. The backoff bound
* will increase exponential if failures continue.
*/
public static final Setting<TimeValue> REPLICATION_INITIAL_RETRY_BACKOFF_BOUND =
Setting.timeSetting("indices.replication.initial_retry_backoff_bound", TimeValue.timeValueMillis(50), TimeValue.timeValueMillis(10),
Setting.Property.Dynamic, Setting.Property.NodeScope);
protected final ThreadPool threadPool;
protected final TransportService transportService;
protected final ClusterService clusterService;
@ -109,6 +126,8 @@ public abstract class TransportReplicationAction<
protected final String transportPrimaryAction;
private final boolean syncGlobalCheckpointAfterOperation;
private volatile TimeValue initialRetryBackoffBound;
private volatile TimeValue retryTimeout;
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
@ -137,6 +156,9 @@ public abstract class TransportReplicationAction<
this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings);
this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings);
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);
transportService.registerRequestHandler(transportPrimaryAction, executor, forceExecutionOnPrimary, true,
@ -149,6 +171,10 @@ public abstract class TransportReplicationAction<
this.transportOptions = transportOptions(settings);
this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
ClusterSettings clusterSettings = clusterService.getClusterSettings();
clusterSettings.addSettingsUpdateConsumer(REPLICATION_INITIAL_RETRY_BACKOFF_BOUND, (v) -> initialRetryBackoffBound = v);
clusterSettings.addSettingsUpdateConsumer(REPLICATION_RETRY_TIMEOUT, (v) -> retryTimeout = v);
}
@Override
@ -371,7 +397,9 @@ public abstract class TransportReplicationAction<
new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
ActionListener.map(responseListener, result -> result.finalResponseIfSuccessful),
newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute();
newReplicasProxy(), logger, threadPool, actionName, primaryRequest.getPrimaryTerm(), initialRetryBackoffBound,
retryTimeout)
.execute();
}
} catch (Exception e) {
handleException(primaryShardReference, e);
@ -396,10 +424,6 @@ public abstract class TransportReplicationAction<
}
protected ActionListener<Response> wrapResponseActionListener(ActionListener<Response> listener, IndexShard shard) {
return listener;
}
public static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
Response extends ReplicationResponse>
implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
@ -929,6 +953,11 @@ public abstract class TransportReplicationAction<
public ReplicationGroup getReplicationGroup() {
return indexShard.getReplicationGroup();
}
@Override
public PendingReplicationActions getPendingReplicationActions() {
return indexShard.getPendingReplicationActions();
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.bootstrap.BootstrapSettings;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
@ -347,6 +348,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING,
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING,
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING,
TransportReplicationAction.REPLICATION_INITIAL_RETRY_BACKOFF_BOUND,
TransportReplicationAction.REPLICATION_RETRY_TIMEOUT,
TransportSettings.HOST,
TransportSettings.PUBLISH_HOST,
TransportSettings.PUBLISH_HOST_PROFILE,

View File

@ -55,6 +55,7 @@ import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
@ -223,6 +224,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
private final double fileBasedRecoveryThreshold;
private final Consumer<ReplicationGroup> onReplicationGroupUpdated;
/**
* Get all retention leases tracked on this shard.
*
@ -870,16 +873,31 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
return value.isPresent() ? value.getAsLong() : SequenceNumbers.UNASSIGNED_SEQ_NO;
}
public ReplicationTracker(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
final long operationPrimaryTerm,
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated,
final LongSupplier currentTimeMillisSupplier,
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases,
final Supplier<SafeCommitInfo> safeCommitInfoSupplier) {
this(shardId, allocationId, indexSettings, operationPrimaryTerm, globalCheckpoint, onGlobalCheckpointUpdated,
currentTimeMillisSupplier, onSyncRetentionLeases, safeCommitInfoSupplier, x -> {});
}
/**
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or
* {@link SequenceNumbers#UNASSIGNED_SEQ_NO}.
*
* @param shardId the shard ID
* @param allocationId the allocation ID
* @param indexSettings the index settings
* @param operationPrimaryTerm the current primary term
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
* @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
* @param shardId the shard ID
* @param allocationId the allocation ID
* @param indexSettings the index settings
* @param operationPrimaryTerm the current primary term
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
* @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
* @param onReplicationGroupUpdated a callback when the replica group changes
*/
public ReplicationTracker(
final ShardId shardId,
@ -890,7 +908,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
final LongConsumer onGlobalCheckpointUpdated,
final LongSupplier currentTimeMillisSupplier,
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases,
final Supplier<SafeCommitInfo> safeCommitInfoSupplier) {
final Supplier<SafeCommitInfo> safeCommitInfoSupplier,
final Consumer<ReplicationGroup> onReplicationGroupUpdated) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
@ -912,6 +931,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
indexSettings.getIndexMetadata().getState() == IndexMetadata.State.OPEN);
this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
this.safeCommitInfoSupplier = safeCommitInfoSupplier;
this.onReplicationGroupUpdated = onReplicationGroupUpdated;
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
assert invariant();
}
@ -926,10 +946,24 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
return replicationGroup;
}
private void updateReplicationGroupAndNotify() {
assert Thread.holdsLock(this);
ReplicationGroup newReplicationGroup = calculateReplicationGroup();
replicationGroup = newReplicationGroup;
onReplicationGroupUpdated.accept(newReplicationGroup);
}
private ReplicationGroup calculateReplicationGroup() {
long newVersion;
if (replicationGroup == null) {
newVersion = 0;
} else {
newVersion = replicationGroup.getVersion() + 1;
}
return new ReplicationGroup(routingTable,
checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()),
checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).map(Map.Entry::getKey).collect(Collectors.toSet()));
checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).map(Map.Entry::getKey).collect(Collectors.toSet()),
newVersion);
}
/**
@ -1098,7 +1132,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
}
appliedClusterStateVersion = applyingClusterStateVersion;
this.routingTable = routingTable;
replicationGroup = calculateReplicationGroup();
updateReplicationGroupAndNotify();
if (primaryMode && removedEntries) {
updateGlobalCheckpointOnPrimary();
// notify any waiter for local checkpoint advancement to recheck that their shard is still being tracked.
@ -1124,7 +1158,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
throw new IllegalStateException("no local checkpoint tracking information available");
}
cps.tracked = true;
replicationGroup = calculateReplicationGroup();
updateReplicationGroupAndNotify();
assert invariant();
}
@ -1169,7 +1203,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
}
} else {
cps.inSync = true;
replicationGroup = calculateReplicationGroup();
updateReplicationGroupAndNotify();
logger.trace("marked [{}] as in-sync", allocationId);
updateGlobalCheckpointOnPrimary();
}
@ -1214,7 +1248,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
pendingInSync.remove(allocationId);
pending = false;
cps.inSync = true;
replicationGroup = calculateReplicationGroup();
updateReplicationGroupAndNotify();
logger.trace("marked [{}] as in-sync", allocationId);
notifyAllWaiters();
}
@ -1342,7 +1376,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
checkpoints.put(entry.getKey(), entry.getValue().copy());
}
routingTable = primaryContext.getRoutingTable();
replicationGroup = calculateReplicationGroup();
updateReplicationGroupAndNotify();
updateGlobalCheckpointOnPrimary();
// reapply missed cluster state update
// note that if there was no cluster state update between start of the engine of this shard and the call to

View File

@ -46,6 +46,7 @@ import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
import org.elasticsearch.action.support.replication.PendingReplicationActions;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
@ -213,6 +214,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final SearchOperationListener searchOperationListener;
private final GlobalCheckpointListeners globalCheckpointListeners;
private final PendingReplicationActions pendingReplicationActions;
private final ReplicationTracker replicationTracker;
protected volatile ShardRouting shardRouting;
@ -338,6 +340,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
this.pendingPrimaryTerm = primaryTerm;
this.globalCheckpointListeners =
new GlobalCheckpointListeners(shardId, threadPool.scheduler(), logger);
this.pendingReplicationActions = new PendingReplicationActions(shardId, threadPool);
this.replicationTracker = new ReplicationTracker(
shardId,
aId,
@ -347,7 +350,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
globalCheckpointListeners::globalCheckpointUpdated,
threadPool::absoluteTimeInMillis,
(retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, aId, getPendingPrimaryTerm(), retentionLeases, listener),
this::getSafeCommitInfo);
this::getSafeCommitInfo,
pendingReplicationActions);
// the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis
@ -1350,7 +1354,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} finally {
// playing safe here and close the engine even if the above succeeds - close can be called multiple times
// Also closing refreshListeners to prevent us from accumulating any more listeners
IOUtils.close(engine, globalCheckpointListeners, refreshListeners);
IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions);
indexShardOperationPermits.close();
}
}
@ -2386,7 +2390,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public ReplicationGroup getReplicationGroup() {
assert assertPrimaryMode();
verifyNotClosed();
return replicationTracker.getReplicationGroup();
ReplicationGroup replicationGroup = replicationTracker.getReplicationGroup();
// PendingReplicationActions is dependent on ReplicationGroup. Every time we expose ReplicationGroup,
// ensure PendingReplicationActions is updated with the newest version to prevent races.
pendingReplicationActions.accept(replicationGroup);
return replicationGroup;
}
/**
* Returns the pending replication actions for the shard.
*
* @return the pending replication actions
*/
public PendingReplicationActions getPendingReplicationActions() {
assert assertPrimaryMode();
verifyNotClosed();
return pendingReplicationActions;
}
/**

View File

@ -34,15 +34,18 @@ public class ReplicationGroup {
private final IndexShardRoutingTable routingTable;
private final Set<String> inSyncAllocationIds;
private final Set<String> trackedAllocationIds;
private final long version;
private final Set<String> unavailableInSyncShards; // derived from the other fields
private final List<ShardRouting> replicationTargets; // derived from the other fields
private final List<ShardRouting> skippedShards; // derived from the other fields
public ReplicationGroup(IndexShardRoutingTable routingTable, Set<String> inSyncAllocationIds, Set<String> trackedAllocationIds) {
public ReplicationGroup(IndexShardRoutingTable routingTable, Set<String> inSyncAllocationIds, Set<String> trackedAllocationIds,
long version) {
this.routingTable = routingTable;
this.inSyncAllocationIds = inSyncAllocationIds;
this.trackedAllocationIds = trackedAllocationIds;
this.version = version;
this.unavailableInSyncShards = Sets.difference(inSyncAllocationIds, routingTable.getAllAllocationIds());
this.replicationTargets = new ArrayList<>();
@ -73,6 +76,10 @@ public class ReplicationGroup {
}
}
public long getVersion() {
return version;
}
public IndexShardRoutingTable getRoutingTable() {
return routingTable;
}
@ -81,6 +88,10 @@ public class ReplicationGroup {
return inSyncAllocationIds;
}
public Set<String> getTrackedAllocationIds() {
return trackedAllocationIds;
}
/**
* Returns the set of shard allocation ids that are in the in-sync set but have no assigned routing entry
*/

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.PendingReplicationActions;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
@ -40,6 +41,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
@ -229,7 +231,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
unavailableShards.forEach(shardRoutingTableBuilder::removeShard);
shardRoutingTable = shardRoutingTableBuilder.build();
final ReplicationGroup replicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
final ReplicationGroup replicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0);
assertThat(replicationGroup.getUnavailableInSyncShards().size(), greaterThan(0));
final PlainActionFuture<PrimaryResult> listener = new PlainActionFuture<>();
@ -239,7 +241,8 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
ReplicationOperation.Replicas<TransportVerifyShardBeforeCloseAction.ShardRequest> proxy = action.newReplicasProxy();
ReplicationOperation<TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest, PrimaryResult> operation = new ReplicationOperation<>(
request, createPrimary(primaryRouting, replicationGroup), listener, proxy, logger, "test", primaryTerm);
request, createPrimary(primaryRouting, replicationGroup), listener, proxy, logger, threadPool, "test", primaryTerm,
TimeValue.timeValueMillis(20), TimeValue.timeValueSeconds(60));
operation.execute();
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
@ -276,13 +279,23 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
TransportVerifyShardBeforeCloseAction.ShardRequest,
PrimaryResult>
createPrimary(final ShardRouting primary, final ReplicationGroup replicationGroup) {
return new ReplicationOperation.Primary<TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest, PrimaryResult>() {
final PendingReplicationActions replicationActions = new PendingReplicationActions(primary.shardId(), threadPool);
replicationActions.accept(replicationGroup);
return new ReplicationOperation.Primary<
TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest,
PrimaryResult>() {
@Override
public ShardRouting routingEntry() {
return primary;
}
@Override
public PendingReplicationActions getPendingReplicationActions() {
return replicationActions;
}
@Override
public ReplicationGroup getReplicationGroup() {
return replicationGroup;

View File

@ -134,7 +134,7 @@ public class TransportResyncReplicationActionTests extends ESTestCase {
when(indexShard.getReplicationGroup()).thenReturn(
new ReplicationGroup(shardRoutingTable,
clusterService.state().metadata().index(index).inSyncAllocationIds(shardId.id()),
shardRoutingTable.getAllAllocationIds()));
shardRoutingTable.getAllAllocationIds(), 0));
final IndexService indexService = mock(IndexService.class);
when(indexService.getShard(eq(shardId.id()))).thenReturn(indexShard);

View File

@ -149,6 +149,30 @@ public class RetryableActionTests extends ESTestCase {
expectThrows(EsRejectedExecutionException.class, future::actionGet);
}
public void testTimeoutOfZeroMeansNoRetry() {
final AtomicInteger executedCount = new AtomicInteger();
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
final RetryableAction<Boolean> retryableAction = new RetryableAction<Boolean>(logger, taskQueue.getThreadPool(),
TimeValue.timeValueMillis(10), TimeValue.timeValueSeconds(0), future) {
@Override
public void tryAction(ActionListener<Boolean> listener) {
executedCount.getAndIncrement();
throw new EsRejectedExecutionException();
}
@Override
public boolean shouldRetry(Exception e) {
return e instanceof EsRejectedExecutionException;
}
};
retryableAction.run();
taskQueue.runAllRunnableTasks();
assertEquals(1, executedCount.get());
expectThrows(EsRejectedExecutionException.class, future::actionGet);
}
public void testFailedBecauseNotRetryable() {
final AtomicInteger executedCount = new AtomicInteger();
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();

View File

@ -0,0 +1,123 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RetryableAction;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import java.util.Collections;
public class PendingReplicationActionsTests extends ESTestCase {
private TestThreadPool threadPool;
private ShardId shardId;
private PendingReplicationActions pendingReplication;
@Override
public void setUp() throws Exception {
super.setUp();
shardId = new ShardId("index", UUIDs.randomBase64UUID(), 0);
threadPool = new TestThreadPool(getTestName());
pendingReplication = new PendingReplicationActions(shardId, threadPool);
}
@Override
public void tearDown() throws Exception {
terminate(threadPool);
super.tearDown();
}
public void testAllocationIdActionCanBeRun() {
String allocationId = UUIDs.randomBase64UUID();
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
pendingReplication.acceptNewTrackedAllocationIds(Collections.singleton(allocationId));
TestAction action = new TestAction(future);
pendingReplication.addPendingAction(allocationId, action);
action.run();
future.actionGet();
assertTrue(future.isDone());
}
public void testMissingAllocationIdActionWillBeCancelled() {
String allocationId = UUIDs.randomBase64UUID();
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
TestAction action = new TestAction(future);
pendingReplication.addPendingAction(allocationId, action);
expectThrows(IndexShardClosedException.class, future::actionGet);
}
public void testAllocationIdActionWillBeCancelledIfTrackedAllocationChanges() {
String allocationId = UUIDs.randomBase64UUID();
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
pendingReplication.acceptNewTrackedAllocationIds(Collections.singleton(allocationId));
TestAction action = new TestAction(future, false);
pendingReplication.addPendingAction(allocationId, action);
action.run();
pendingReplication.acceptNewTrackedAllocationIds(Collections.emptySet());
expectThrows(IndexShardClosedException.class, future::actionGet);
}
public void testAllocationIdActionWillBeCancelledOnClose() {
String allocationId = UUIDs.randomBase64UUID();
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
pendingReplication.acceptNewTrackedAllocationIds(Collections.singleton(allocationId));
TestAction action = new TestAction(future, false);
pendingReplication.addPendingAction(allocationId, action);
action.run();
pendingReplication.close();
expectThrows(IndexShardClosedException.class, future::actionGet);
}
private class TestAction extends RetryableAction<Void> {
private final boolean succeed;
private final Exception retryable = new Exception();
private TestAction(ActionListener<Void> listener) {
this(listener, true);
}
private TestAction(ActionListener<Void> listener, boolean succeed) {
super(logger, threadPool, TimeValue.timeValueMillis(1), TimeValue.timeValueMinutes(1), listener);
this.succeed = succeed;
}
@Override
public void tryAction(ActionListener<Void> listener) {
if (succeed) {
listener.onResponse(null);
} else {
listener.onFailure(retryable);
}
}
@Override
public boolean shouldRetry(Exception e) {
return retryable == e;
}
}
}

View File

@ -35,7 +35,12 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState;
@ -43,8 +48,13 @@ import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.SendRequestTransportException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -54,6 +64,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@ -67,6 +78,20 @@ import static org.hamcrest.Matchers.nullValue;
public class ReplicationOperationTests extends ESTestCase {
private ThreadPool threadPool;
@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getTestName());
}
@Override
public void tearDown() throws Exception {
terminate(threadPool);
super.tearDown();
}
public void testReplication() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
@ -92,7 +117,7 @@ public class ReplicationOperationTests extends ESTestCase {
addTrackingInfo(indexShardRoutingTable, primaryShard, trackedShards, untrackedShards);
trackedShards.addAll(staleAllocationIds);
final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards);
final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards, 0);
final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards);
@ -117,7 +142,7 @@ public class ReplicationOperationTests extends ESTestCase {
PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
final TestReplicaProxy replicasProxy = new TestReplicaProxy(simulatedFailures);
final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup);
final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup, threadPool);
final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, primaryTerm);
op.execute();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
@ -141,6 +166,81 @@ public class ReplicationOperationTests extends ESTestCase {
assertThat(primary.knownGlobalCheckpoints, equalTo(replicasProxy.generatedGlobalCheckpoints));
}
public void testRetryTransientReplicationFailure() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
ClusterState initialState = stateWithActivePrimary(index, true, randomInt(5));
IndexMetadata indexMetadata = initialState.getMetadata().index(index);
final long primaryTerm = indexMetadata.primaryTerm(0);
final IndexShardRoutingTable indexShardRoutingTable = initialState.getRoutingTable().shardRoutingTable(shardId);
ShardRouting primaryShard = indexShardRoutingTable.primaryShard();
if (primaryShard.relocating() && randomBoolean()) {
// simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated
initialState = ClusterState.builder(initialState)
.nodes(DiscoveryNodes.builder(initialState.nodes()).localNodeId(primaryShard.relocatingNodeId())).build();
primaryShard = primaryShard.getTargetRelocatingShard();
}
// add a few in-sync allocation ids that don't have corresponding routing entries
final Set<String> staleAllocationIds = Sets.newHashSet(generateRandomStringArray(4, 10, false));
final Set<String> inSyncAllocationIds = Sets.union(indexMetadata.inSyncAllocationIds(0), staleAllocationIds);
final Set<String> trackedShards = new HashSet<>();
final Set<String> untrackedShards = new HashSet<>();
addTrackingInfo(indexShardRoutingTable, primaryShard, trackedShards, untrackedShards);
trackedShards.addAll(staleAllocationIds);
final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards, 0);
final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards);
final Map<ShardRouting, Exception> simulatedFailures = new HashMap<>();
for (ShardRouting replica : expectedReplicas) {
Exception cause;
Exception exception;
if (randomBoolean()) {
if (randomBoolean()) {
cause = new CircuitBreakingException("broken", CircuitBreaker.Durability.PERMANENT);
} else {
cause = new EsRejectedExecutionException("rejected");
}
exception = new RemoteTransportException("remote", cause);
} else {
TransportAddress address = new TransportAddress(InetAddress.getLoopbackAddress(), 9300);
DiscoveryNode node = new DiscoveryNode("replica", address, Version.CURRENT);
cause = new ConnectTransportException(node, "broken");
exception = cause;
}
logger.debug("--> simulating failure on {} with [{}]", replica, exception.getClass().getSimpleName());
simulatedFailures.put(replica, exception);
}
Request request = new Request(shardId);
PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
final TestReplicaProxy replicasProxy = new TestReplicaProxy(simulatedFailures, true);
final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup, threadPool);
final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, primaryTerm,
TimeValue.timeValueMillis(20), TimeValue.timeValueSeconds(60));
op.execute();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
assertThat(request.processedOnReplicas, equalTo(expectedReplicas));
assertThat(replicasProxy.failedReplicas.size(), equalTo(0));
assertThat(replicasProxy.markedAsStaleCopies, equalTo(staleAllocationIds));
assertThat("post replication operations not run on primary", request.runPostReplicationActionsOnPrimary.get(), equalTo(true));
ShardInfo shardInfo = listener.actionGet().getShardInfo();
assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size()));
final List<ShardRouting> unassignedShards = indexShardRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED);
final int totalShards = 1 + expectedReplicas.size() + unassignedShards.size() + untrackedShards.size();
assertThat(replicationGroup.toString(), shardInfo.getTotal(), equalTo(totalShards));
assertThat(primary.knownLocalCheckpoints.remove(primaryShard.allocationId().getId()), equalTo(primary.localCheckpoint));
assertThat(primary.knownLocalCheckpoints, equalTo(replicasProxy.generatedLocalCheckpoints));
assertThat(primary.knownGlobalCheckpoints.remove(primaryShard.allocationId().getId()), equalTo(primary.globalCheckpoint));
assertThat(primary.knownGlobalCheckpoints, equalTo(replicasProxy.generatedGlobalCheckpoints));
}
private void addTrackingInfo(IndexShardRoutingTable indexShardRoutingTable, ShardRouting primaryShard, Set<String> trackedShards,
Set<String> untrackedShards) {
for (ShardRouting shr : indexShardRoutingTable.shards()) {
@ -187,7 +287,7 @@ public class ReplicationOperationTests extends ESTestCase {
addTrackingInfo(indexShardRoutingTable, primaryShard, trackedShards, new HashSet<>());
trackedShards.addAll(staleAllocationIds);
final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards);
final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards, 0);
final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards);
@ -234,7 +334,7 @@ public class ReplicationOperationTests extends ESTestCase {
}
};
AtomicBoolean primaryFailed = new AtomicBoolean();
final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup) {
final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup, threadPool) {
@Override
public void failShard(String message, Exception exception) {
assertThat(exception, instanceOf(ShardStateAction.NoLongerPrimaryShardException.class));
@ -263,7 +363,7 @@ public class ReplicationOperationTests extends ESTestCase {
IndexShardRoutingTable shardRoutingTable = initialState.getRoutingTable().shardRoutingTable(shardId);
Set<String> trackedShards = new HashSet<>();
addTrackingInfo(shardRoutingTable, null, trackedShards, new HashSet<>());
ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0);
final ClusterState stateWithAddedReplicas;
if (randomBoolean()) {
@ -278,13 +378,13 @@ public class ReplicationOperationTests extends ESTestCase {
trackedShards = new HashSet<>();
addTrackingInfo(shardRoutingTable, null, trackedShards, new HashSet<>());
ReplicationGroup updatedReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
ReplicationGroup updatedReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0);
final AtomicReference<ReplicationGroup> replicationGroup = new AtomicReference<>(initialReplicationGroup);
logger.debug("--> using initial replicationGroup:\n{}", replicationGroup.get());
final long primaryTerm = initialState.getMetadata().index(shardId.getIndexName()).primaryTerm(shardId.id());
final ShardRouting primaryShard = updatedReplicationGroup.getRoutingTable().primaryShard();
final TestPrimary primary = new TestPrimary(primaryShard, replicationGroup::get) {
final TestPrimary primary = new TestPrimary(primaryShard, replicationGroup::get, threadPool) {
@Override
public void perform(Request request, ActionListener<Result> listener) {
super.perform(request, ActionListener.map(listener, result -> {
@ -336,13 +436,13 @@ public class ReplicationOperationTests extends ESTestCase {
final Set<String> inSyncAllocationIds = state.metadata().index(index).inSyncAllocationIds(0);
Set<String> trackedShards = new HashSet<>();
addTrackingInfo(shardRoutingTable, null, trackedShards, new HashSet<>());
final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0);
PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
final ShardRouting primaryShard = shardRoutingTable.primaryShard();
final TestReplicationOperation op = new TestReplicationOperation(request,
new TestPrimary(primaryShard, () -> initialReplicationGroup),
listener, new TestReplicaProxy(), logger, "test", primaryTerm);
new TestPrimary(primaryShard, () -> initialReplicationGroup, threadPool),
listener, new TestReplicaProxy(), logger, threadPool, "test", primaryTerm);
if (passesActiveShardCheck) {
assertThat(op.checkActiveShardCount(), nullValue());
@ -372,12 +472,12 @@ public class ReplicationOperationTests extends ESTestCase {
final Set<String> inSyncAllocationIds = indexMetadata.inSyncAllocationIds(0);
final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(index).shard(shardId.id());
final Set<String> trackedShards = shardRoutingTable.getAllAllocationIds();
final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0);
final boolean fatal = randomBoolean();
final AtomicBoolean primaryFailed = new AtomicBoolean();
final ReplicationOperation.Primary<Request, Request, TestPrimary.Result> primary =
new TestPrimary(primaryRouting, () -> initialReplicationGroup) {
new TestPrimary(primaryRouting, () -> initialReplicationGroup, threadPool) {
@Override
public void failShard(String message, Exception exception) {
@ -460,15 +560,17 @@ public class ReplicationOperationTests extends ESTestCase {
final long globalCheckpoint;
final long maxSeqNoOfUpdatesOrDeletes;
final Supplier<ReplicationGroup> replicationGroupSupplier;
final PendingReplicationActions pendingReplicationActions;
final Map<String, Long> knownLocalCheckpoints = new HashMap<>();
final Map<String, Long> knownGlobalCheckpoints = new HashMap<>();
TestPrimary(ShardRouting routing, Supplier<ReplicationGroup> replicationGroupSupplier) {
TestPrimary(ShardRouting routing, Supplier<ReplicationGroup> replicationGroupSupplier, ThreadPool threadPool) {
this.routing = routing;
this.replicationGroupSupplier = replicationGroupSupplier;
this.localCheckpoint = random().nextLong();
this.globalCheckpoint = randomNonNegativeLong();
this.maxSeqNoOfUpdatesOrDeletes = randomNonNegativeLong();
this.pendingReplicationActions = new PendingReplicationActions(routing.shardId(), threadPool);
}
@Override
@ -555,6 +657,11 @@ public class ReplicationOperationTests extends ESTestCase {
return replicationGroupSupplier.get();
}
@Override
public PendingReplicationActions getPendingReplicationActions() {
pendingReplicationActions.accept(getReplicationGroup());
return pendingReplicationActions;
}
}
static class ReplicaResponse implements ReplicationOperation.ReplicaResponse {
@ -580,7 +687,10 @@ public class ReplicationOperationTests extends ESTestCase {
static class TestReplicaProxy implements ReplicationOperation.Replicas<Request> {
private final int attemptsBeforeSuccess;
private final AtomicInteger attemptsNumber = new AtomicInteger(0);
final Map<ShardRouting, Exception> opFailures;
private final boolean retryable;
final Set<ShardRouting> failedReplicas = ConcurrentCollections.newConcurrentSet();
@ -595,7 +705,17 @@ public class ReplicationOperationTests extends ESTestCase {
}
TestReplicaProxy(Map<ShardRouting, Exception> opFailures) {
this(opFailures, false);
}
TestReplicaProxy(Map<ShardRouting, Exception> opFailures, boolean retryable) {
this.opFailures = opFailures;
this.retryable = retryable;
if (retryable) {
attemptsBeforeSuccess = randomInt(2) + 1;
} else {
attemptsBeforeSuccess = Integer.MAX_VALUE;
}
}
@Override
@ -606,9 +726,19 @@ public class ReplicationOperationTests extends ESTestCase {
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
assertTrue("replica request processed twice on [" + replica + "]", request.processedOnReplicas.add(replica));
assertFalse("primary post replication actions should run after replication", request.runPostReplicationActionsOnPrimary.get());
if (opFailures.containsKey(replica)) {
boolean added = request.processedOnReplicas.add(replica);
if (retryable == false) {
assertTrue("replica request processed twice on [" + replica + "]", added);
}
// If replication is not retryable OR this is the first attempt, the post replication actions
// should not have run.
if (retryable == false || added) {
assertFalse("primary post replication actions should run after replication",
request.runPostReplicationActionsOnPrimary.get());
}
// If this is a retryable scenario and this is the second try, we finish successfully
int n = attemptsNumber.incrementAndGet();
if (opFailures.containsKey(replica) && n <= attemptsBeforeSuccess) {
listener.onFailure(opFailures.get(replica));
} else {
final long generatedLocalCheckpoint = random().nextLong();
@ -643,15 +773,31 @@ public class ReplicationOperationTests extends ESTestCase {
}
class TestReplicationOperation extends ReplicationOperation<Request, Request, TestPrimary.Result> {
TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
ActionListener<TestPrimary.Result> listener, Replicas<Request> replicas, long primaryTerm) {
this(request, primary, listener, replicas, ReplicationOperationTests.this.logger, "test", primaryTerm);
ActionListener<TestPrimary.Result> listener, Replicas<Request> replicas, long primaryTerm,
TimeValue initialRetryBackoffBound, TimeValue retryTimeout) {
this(request, primary, listener, replicas, ReplicationOperationTests.this.logger, threadPool, "test", primaryTerm,
initialRetryBackoffBound, retryTimeout);
}
TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
ActionListener<TestPrimary.Result> listener, Replicas<Request> replicas, long primaryTerm) {
this(request, primary, listener, replicas, ReplicationOperationTests.this.logger, threadPool, "test", primaryTerm);
}
TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
ActionListener<TestPrimary.Result> listener,
Replicas<Request> replicas, Logger logger, String opType, long primaryTerm) {
super(request, primary, listener, replicas, logger, opType, primaryTerm);
Replicas<Request> replicas, Logger logger, ThreadPool threadPool, String opType, long primaryTerm) {
this(request, primary, listener, replicas, logger, threadPool, opType, primaryTerm, TimeValue.timeValueMillis(50),
TimeValue.timeValueSeconds(1));
}
TestReplicationOperation(Request request, Primary<Request, Request, TestPrimary.Result> primary,
ActionListener<TestPrimary.Result> listener,
Replicas<Request> replicas, Logger logger, ThreadPool threadPool, String opType, long primaryTerm,
TimeValue initialRetryBackoffBound, TimeValue retryTimeout) {
super(request, primary, listener, replicas, logger, threadPool, opType, primaryTerm, initialRetryBackoffBound, retryTimeout);
}
}

View File

@ -828,10 +828,12 @@ public class TransportReplicationActionTests extends ESTestCase {
IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId);
Set<String> inSyncIds = randomBoolean() ? singleton(routingEntry.allocationId().getId()) :
clusterService.state().metadata().index(index).inSyncAllocationIds(0);
ReplicationGroup replicationGroup = new ReplicationGroup(shardRoutingTable, inSyncIds, shardRoutingTable.getAllAllocationIds(), 0);
when(shard.getReplicationGroup()).thenReturn(
new ReplicationGroup(shardRoutingTable,
inSyncIds,
shardRoutingTable.getAllAllocationIds()));
replicationGroup);
PendingReplicationActions replicationActions = new PendingReplicationActions(shardId, threadPool);
replicationActions.accept(replicationGroup);
when(shard.getPendingReplicationActions()).thenReturn(replicationActions);
doAnswer(invocation -> {
count.incrementAndGet();
//noinspection unchecked

View File

@ -359,7 +359,7 @@ public class TransportWriteActionTests extends ESTestCase {
protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) {
super(Settings.EMPTY, "internal:test",
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> null, null, Collections.emptySet()), null, null, null, null,
x -> null, null, Collections.emptySet()), TransportWriteActionTests.this.clusterService, null, null, null,
new ActionFilters(new HashSet<>()), TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;

View File

@ -39,6 +39,7 @@ import org.elasticsearch.action.resync.TransportResyncReplicationAction;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.PendingReplicationActions;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
@ -63,6 +64,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
@ -608,7 +610,8 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
adaptResponse(result.finalResponse, getPrimaryShard());
return result.finalResponse;
}),
new ReplicasRef(), logger, opType, primaryTerm)
new ReplicasRef(), logger, threadPool, opType, primaryTerm, TimeValue.timeValueMillis(20),
TimeValue.timeValueSeconds(60))
.execute();
} catch (Exception e) {
listener.onFailure(e);
@ -680,6 +683,10 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
return getPrimaryShard().getReplicationGroup();
}
@Override
public PendingReplicationActions getPendingReplicationActions() {
return getPrimaryShard().getPendingReplicationActions();
}
}
class ReplicasRef implements ReplicationOperation.Replicas<ReplicaRequest> {

View File

@ -37,6 +37,7 @@ import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExc
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterName;
@ -525,6 +526,12 @@ public final class InternalTestCluster extends TestCluster {
builder.put(ScriptService.SCRIPT_GENERAL_CACHE_EXPIRE_SETTING.getKey(),
timeValueMillis(RandomNumbers.randomIntBetween(random, 750, 10000000)).getStringRep());
}
if (random.nextBoolean()) {
int initialMillisBound = RandomNumbers.randomIntBetween(random,10, 100);
builder.put(TransportReplicationAction.REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.getKey(), timeValueMillis(initialMillisBound));
int retryTimeoutSeconds = RandomNumbers.randomIntBetween(random, 0, 60);
builder.put(TransportReplicationAction.REPLICATION_RETRY_TIMEOUT.getKey(), timeValueSeconds(retryTimeoutSeconds));
}
return builder.build();
}