From c151a75dfe065099289aa14e26cf3e7507d6c4b4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 21 Nov 2019 14:57:38 -0500 Subject: [PATCH] Use retention lease in peer recovery of closed indices (#48430) Today we do not use retention leases in peer recovery for closed indices because we can't sync retention leases on closed indices. This change allows that ability and adjusts peer recovery to use retention leases for all indices with soft-deletes enabled. Relates #45136 Co-authored-by: David Turner --- .../util/concurrent/AbstractAsyncTask.java | 2 +- .../index/seqno/ReplicationTracker.java | 64 ++++++------- .../RetentionLeaseBackgroundSyncAction.java | 90 ++++++++++++------- .../index/seqno/RetentionLeaseSyncAction.java | 85 +++++++++++------- .../index/seqno/RetentionLeaseSyncer.java | 68 ++++++++------ .../elasticsearch/index/shard/IndexShard.java | 7 +- .../elasticsearch/indices/IndicesModule.java | 6 ++ .../cluster/IndicesClusterStateService.java | 24 +---- .../recovery/RecoverySourceHandler.java | 18 ++-- .../gateway/ReplicaShardAllocatorIT.java | 34 +++++++ ...tentionLeaseBackgroundSyncActionTests.java | 82 ----------------- .../seqno/RetentionLeaseSyncActionTests.java | 75 +--------------- .../snapshots/SnapshotResiliencyTests.java | 22 +---- .../coordination/DeterministicTaskQueue.java | 2 +- .../ESIndexLevelReplicationTestCase.java | 18 ++-- .../xpack/ccr/FollowIndexSecurityIT.java | 2 +- 16 files changed, 250 insertions(+), 349 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java index 1ef9a484a27..58e48f02155 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java @@ -91,7 +91,7 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable { if (logger.isTraceEnabled()) { logger.trace("scheduling {} every {}", toString(), interval); } - cancellable = threadPool.schedule(this, interval, getThreadPool()); + cancellable = threadPool.schedule(threadPool.preserveContext(this), interval, getThreadPool()); isScheduledOrRunning = true; } else { logger.trace("scheduled {} disabled", toString()); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index f4aa145620c..81061c05805 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -838,10 +838,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked"; } - if (primaryMode - && indexSettings.isSoftDeleteEnabled() - && indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN - && hasAllPeerRecoveryRetentionLeases) { + if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) { // all tracked shard copies have a corresponding peer-recovery retention lease for (final ShardRouting shardRouting : routingTable.assignedShards()) { if (checkpoints.get(shardRouting.allocationId().getId()).tracked) { @@ -909,7 +906,9 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; - this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0); + this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) || + (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) && + indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN); this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); this.safeCommitInfoSupplier = safeCommitInfoSupplier; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; @@ -1022,34 +1021,32 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L assert primaryMode; assert Thread.holdsLock(this); - if (indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN) { - final ShardRouting primaryShard = routingTable.primaryShard(); - final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard); - if (retentionLeases.get(leaseId) == null) { - if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) { - assert primaryShard.allocationId().getId().equals(shardAllocationId) - : routingTable.assignedShards() + " vs " + shardAllocationId; - // Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication - // group. - logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId); - innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1), - PEER_RECOVERY_RETENTION_LEASE_SOURCE); - hasAllPeerRecoveryRetentionLeases = true; - } else { - /* - * We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention - * leases for every shard copy, but in this case we do not expect any leases to exist. - */ - assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases; - logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases); - } - } else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting -> - retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) - || checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) { - // Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we - // don't need to do any more work. + final ShardRouting primaryShard = routingTable.primaryShard(); + final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard); + if (retentionLeases.get(leaseId) == null) { + if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) { + assert primaryShard.allocationId().getId().equals(shardAllocationId) + : routingTable.assignedShards() + " vs " + shardAllocationId; + // Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication + // group. + logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId); + innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1), + PEER_RECOVERY_RETENTION_LEASE_SOURCE); hasAllPeerRecoveryRetentionLeases = true; + } else { + /* + * We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention + * leases for every shard copy, but in this case we do not expect any leases to exist. + */ + assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases; + logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases); } + } else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting -> + retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) + || checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) { + // Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we + // don't need to do any more work. + hasAllPeerRecoveryRetentionLeases = true; } } @@ -1371,10 +1368,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L * prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases. */ public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener listener) { - if (indexSettings().isSoftDeleteEnabled() - && indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN - && hasAllPeerRecoveryRetentionLeases == false) { - + if (indexSettings().isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases == false) { final List shardRoutings = routingTable.assignedShards(); final GroupedActionListener groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> { setHasAllPeerRecoveryRetentionLeases(); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index 661e095025f..1f6dd80a9f8 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.ReplicationTask; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.service.ClusterService; @@ -36,17 +37,21 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Map; import java.util.Objects; /** @@ -60,8 +65,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi RetentionLeaseBackgroundSyncAction.Request, ReplicationResponse> { - public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync"; - + public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync"; private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class); protected Logger getLogger() { @@ -91,37 +95,50 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi ThreadPool.Names.MANAGEMENT); } - /** - * Background sync the specified retention leases for the specified shard. - * - * @param shardId the shard to sync - * @param retentionLeases the retention leases to sync - */ - public void backgroundSync( - final ShardId shardId, - final RetentionLeases retentionLeases) { - Objects.requireNonNull(shardId); - Objects.requireNonNull(retentionLeases); - final ThreadContext threadContext = threadPool.getThreadContext(); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - // we have to execute under the system context so that if security is enabled the sync is authorized - threadContext.markAsSystemContext(); - execute( - new Request(shardId, retentionLeases), - ActionListener.wrap( - r -> {}, - e -> { - if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) { - // node shutting down - return; - } - if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) { - // the shard is closed - return; - } - getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e); - })); - } + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + assert false : "use RetentionLeaseBackgroundSyncAction#backgroundSync"; + } + + final void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) { + final Request request = new Request(shardId, retentionLeases); + final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_background_sync", request); + transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction, + new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), + task, + transportOptions, + new TransportResponseHandler() { + @Override + public ReplicationResponse read(StreamInput in) throws IOException { + return newResponseInstance(in); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(ReplicationResponse response) { + task.setPhase("finished"); + taskManager.unregister(task); + } + + @Override + public void handleException(TransportException e) { + task.setPhase("finished"); + taskManager.unregister(task); + if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) { + // node shutting down + return; + } + if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) { + // the shard is closed + return; + } + getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e); + } + }); } @Override @@ -171,6 +188,11 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi retentionLeases.writeTo(out); } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ReplicationTask(id, type, action, "retention_lease_background_sync shardId=" + shardId, parentTaskId, headers); + } + @Override public String toString() { return "RetentionLeaseBackgroundSyncAction.Request{" + diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 01f1023e490..44da3318698 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.ReplicationTask; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -38,16 +39,20 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Map; import java.util.Objects; /** @@ -57,8 +62,7 @@ import java.util.Objects; public class RetentionLeaseSyncAction extends TransportWriteAction { - public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync"; - + public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync"; private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class); protected Logger getLogger() { @@ -88,35 +92,47 @@ public class RetentionLeaseSyncAction extends ThreadPool.Names.MANAGEMENT, false); } - /** - * Sync the specified retention leases for the specified shard. The callback is invoked when the sync succeeds or fails. - * - * @param shardId the shard to sync - * @param retentionLeases the retention leases to sync - * @param listener the callback to invoke when the sync completes normally or abnormally - */ - public void sync( - final ShardId shardId, - final RetentionLeases retentionLeases, - final ActionListener listener) { - Objects.requireNonNull(shardId); - Objects.requireNonNull(retentionLeases); - Objects.requireNonNull(listener); - final ThreadContext threadContext = threadPool.getThreadContext(); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - // we have to execute under the system context so that if security is enabled the sync is authorized - threadContext.markAsSystemContext(); - execute( - new RetentionLeaseSyncAction.Request(shardId, retentionLeases), - ActionListener.wrap( - listener::onResponse, - e -> { - if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { - getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e); - } - listener.onFailure(e); - })); - } + @Override + protected void doExecute(Task parentTask, Request request, ActionListener listener) { + assert false : "use RetentionLeaseSyncAction#sync"; + } + + final void sync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases, + ActionListener listener) { + final Request request = new Request(shardId, retentionLeases); + final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request); + transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction, + new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), + task, + transportOptions, + new TransportResponseHandler() { + @Override + public ReplicationResponse read(StreamInput in) throws IOException { + return newResponseInstance(in); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(ReplicationResponse response) { + task.setPhase("finished"); + taskManager.unregister(task); + listener.onResponse(response); + } + + @Override + public void handleException(TransportException e) { + if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { + getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e); + } + task.setPhase("finished"); + taskManager.unregister(task); + listener.onFailure(e); + } + }); } @Override @@ -172,6 +188,11 @@ public class RetentionLeaseSyncAction extends retentionLeases.writeTo(out); } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ReplicationTask(id, type, action, "retention_lease_sync shardId=" + shardId, parentTaskId, headers); + } + @Override public String toString() { return "RetentionLeaseSyncAction.Request{" + diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java index 7de6bad3f11..40f80fee2b0 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java @@ -21,36 +21,52 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.shard.ShardId; -/** - * A functional interface that represents a method for syncing retention leases to replica shards after a new retention lease is added on - * the primary. - */ -public interface RetentionLeaseSyncer { +import java.util.Objects; + +public class RetentionLeaseSyncer { + private final SyncAction syncAction; + private final BackgroundSyncAction backgroundSyncAction; + + @Inject + public RetentionLeaseSyncer(RetentionLeaseSyncAction syncAction, RetentionLeaseBackgroundSyncAction backgroundSyncAction) { + this(syncAction::sync, backgroundSyncAction::backgroundSync); + } + + public RetentionLeaseSyncer(SyncAction syncAction, BackgroundSyncAction backgroundSyncAction) { + this.syncAction = Objects.requireNonNull(syncAction); + this.backgroundSyncAction = Objects.requireNonNull(backgroundSyncAction); + } + + public static final RetentionLeaseSyncer EMPTY = new RetentionLeaseSyncer( + (shardId, primaryAllocationId, primaryTerm, retentionLeases, listener) -> listener.onResponse(new ReplicationResponse()), + (shardId, primaryAllocationId, primaryTerm, retentionLeases) -> { }); + + public void sync(ShardId shardId, String primaryAllocationId, long primaryTerm, + RetentionLeases retentionLeases, ActionListener listener) { + syncAction.sync(shardId, primaryAllocationId, primaryTerm, retentionLeases, listener); + } + + public void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) { + backgroundSyncAction.backgroundSync(shardId, primaryAllocationId, primaryTerm, retentionLeases); + } /** - * Represents a method that when invoked syncs retention leases to replica shards after a new retention lease is added on the primary. - * The specified listener is invoked when the syncing completes with success or failure. - * - * @param shardId the shard ID - * @param retentionLeases the retention leases to sync - * @param listener the callback when sync completes + * Represents an action that is invoked to sync retention leases to replica shards after a retention lease is added + * or removed on the primary. The specified listener is invoked when the syncing completes with success or failure. */ - void sync(ShardId shardId, RetentionLeases retentionLeases, ActionListener listener); - - void backgroundSync(ShardId shardId, RetentionLeases retentionLeases); - - RetentionLeaseSyncer EMPTY = new RetentionLeaseSyncer() { - @Override - public void sync(final ShardId shardId, final RetentionLeases retentionLeases, final ActionListener listener) { - listener.onResponse(new ReplicationResponse()); - } - - @Override - public void backgroundSync(final ShardId shardId, final RetentionLeases retentionLeases) { - - } - }; + public interface SyncAction { + void sync(ShardId shardId, String primaryAllocationId, long primaryTerm, + RetentionLeases retentionLeases, ActionListener listener); + } + /** + * Represents an action that is invoked periodically to sync retention leases to replica shards after some retention + * lease has been renewed or expired. + */ + public interface BackgroundSyncAction { + void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases); + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7b6079bcb84..c86040d1108 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -342,7 +342,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated, threadPool::absoluteTimeInMillis, - (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener), + (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, aId, getPendingPrimaryTerm(), retentionLeases, listener), this::getSafeCommitInfo); // the query cache is a node-level thing, however we want the most popular filters @@ -2216,6 +2216,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2()); retentionLeaseSyncer.sync( shardId, + shardRouting.allocationId().getId(), + getPendingPrimaryTerm(), retentionLeases.v2(), ActionListener.wrap( r -> {}, @@ -2225,7 +2227,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl e))); } else { logger.trace("background syncing retention leases [{}] after expiration check", retentionLeases.v2()); - retentionLeaseSyncer.backgroundSync(shardId, retentionLeases.v2()); + retentionLeaseSyncer.backgroundSync( + shardId, shardRouting.allocationId().getId(), getPendingPrimaryTerm(), retentionLeases.v2()); } } diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java index b34bf7e89a6..068569c68cf 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -57,6 +57,9 @@ import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.TextFieldMapper; import org.elasticsearch.index.mapper.TypeFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; +import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.indices.cluster.IndicesClusterStateService; @@ -249,6 +252,9 @@ public class IndicesModule extends AbstractModule { bind(GlobalCheckpointSyncAction.class).asEagerSingleton(); bind(TransportResyncReplicationAction.class).asEagerSingleton(); bind(PrimaryReplicaSyncer.class).asEagerSingleton(); + bind(RetentionLeaseSyncAction.class).asEagerSingleton(); + bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton(); + bind(RetentionLeaseSyncer.class).asEagerSingleton(); } /** diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 9dac970a317..939658fddb6 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -24,7 +24,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; @@ -54,10 +53,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.ReplicationTracker; -import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; -import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; -import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRelocatedException; @@ -140,11 +136,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple final SnapshotShardsService snapshotShardsService, final PrimaryReplicaSyncer primaryReplicaSyncer, final GlobalCheckpointSyncAction globalCheckpointSyncAction, - final RetentionLeaseSyncAction retentionLeaseSyncAction, - final RetentionLeaseBackgroundSyncAction retentionLeaseBackgroundSyncAction) { + final RetentionLeaseSyncer retentionLeaseSyncer) { this( settings, - (AllocatedIndices>) indicesService, + indicesService, clusterService, threadPool, recoveryTargetService, @@ -157,20 +152,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple snapshotShardsService, primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard, - new RetentionLeaseSyncer() { - @Override - public void sync( - final ShardId shardId, - final RetentionLeases retentionLeases, - final ActionListener listener) { - Objects.requireNonNull(retentionLeaseSyncAction).sync(shardId, retentionLeases, listener); - } - - @Override - public void backgroundSync(final ShardId shardId, final RetentionLeases retentionLeases) { - Objects.requireNonNull(retentionLeaseBackgroundSyncAction).backgroundSync(shardId, retentionLeases); - } - }); + retentionLeaseSyncer); } // for tests diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 34449d70671..56b48462b94 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -38,7 +38,6 @@ import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.replication.ReplicationResponse; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.CheckedSupplier; @@ -156,8 +155,7 @@ public class RecoverySourceHandler { IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e)); }; - final boolean useRetentionLeases = shard.indexSettings().isSoftDeleteEnabled() - && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE; + final boolean softDeletesEnabled = shard.indexSettings().isSoftDeleteEnabled(); final SetOnce retentionLeaseRef = new SetOnce<>(); runUnderPrimaryPermit(() -> { @@ -169,7 +167,7 @@ public class RecoverySourceHandler { throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - retentionLeaseRef.set(useRetentionLeases ? shard.getRetentionLeases().get( + retentionLeaseRef.set(softDeletesEnabled ? shard.getRetentionLeases().get( ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null); }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); @@ -180,7 +178,7 @@ public class RecoverySourceHandler { = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()) - && (useRetentionLeases == false + && (softDeletesEnabled == false || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); // NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease, // because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's @@ -188,7 +186,7 @@ public class RecoverySourceHandler { // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery // without having a complete history. - if (isSequenceNumberBasedRecovery && useRetentionLeases) { + if (isSequenceNumberBasedRecovery && softDeletesEnabled) { // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock retentionLock.close(); logger.trace("history is retained by {}", retentionLeaseRef.get()); @@ -227,7 +225,7 @@ public class RecoverySourceHandler { // advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can // always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled // down. - startingSeqNo = useRetentionLeases + startingSeqNo = softDeletesEnabled ? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L : 0; logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); @@ -245,7 +243,7 @@ public class RecoverySourceHandler { }); final StepListener deleteRetentionLeaseStep = new StepListener<>(); - if (useRetentionLeases) { + if (softDeletesEnabled) { runUnderPrimaryPermit(() -> { try { // If the target previously had a copy of this shard then a file-based recovery might move its global @@ -268,7 +266,7 @@ public class RecoverySourceHandler { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); final Consumer> createRetentionLeaseAsync; - if (useRetentionLeases) { + if (softDeletesEnabled) { createRetentionLeaseAsync = l -> createRetentionLease(startingSeqNo, l); } else { createRetentionLeaseAsync = l -> l.onResponse(null); @@ -306,7 +304,7 @@ public class RecoverySourceHandler { final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); resources.add(phase2Snapshot); - if (useRetentionLeases == false || isSequenceNumberBasedRecovery == false) { + if (softDeletesEnabled == false || isSequenceNumberBasedRecovery == false) { // we can release the retention lock here because the snapshot itself will retain the required operations. retentionLock.close(); } diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java index 63f4e225a16..3c31ad185d8 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -323,6 +323,40 @@ public class ReplicaShardAllocatorIT extends ESIntegTestCase { transportService.clearAllRules(); } + public void testPeerRecoveryForClosedIndices() throws Exception { + String indexName = "peer_recovery_closed_indices"; + internalCluster().ensureAtLeastNumDataNodes(1); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(Collectors.toList())); + ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName); + assertAcked(client().admin().indices().prepareClose(indexName)); + int numberOfReplicas = randomIntBetween(1, 2); + internalCluster().ensureAtLeastNumDataNodes(2 + numberOfReplicas); + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas))); + ensureGreen(indexName); + ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName); + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put("cluster.routing.allocation.enable", "primaries").build())); + internalCluster().fullRestart(); + ensureYellow(indexName); + if (randomBoolean()) { + assertAcked(client().admin().indices().prepareOpen(indexName)); + client().admin().indices().prepareForceMerge(indexName).get(); + } + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("cluster.routing.allocation.enable").build())); + ensureGreen(indexName); + assertNoOpRecoveries(indexName); + } + private void ensureActivePeerRecoveryRetentionLeasesAdvanced(String indexName) throws Exception { assertBusy(() -> { Index index = resolveIndex(indexName); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index 6d9457193c4..faccbc7ff48 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -17,17 +17,12 @@ package org.elasticsearch.index.seqno; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; -import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.internal.io.IOUtils; @@ -35,31 +30,21 @@ import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.node.NodeClosedException; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.SendRequestTransportException; -import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportService; -import org.mockito.ArgumentCaptor; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions; import static org.elasticsearch.mock.orig.Mockito.when; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; -import static org.hamcrest.Matchers.arrayContaining; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -173,73 +158,6 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase { assertTrue(success.get()); } - public void testRetentionLeaseSyncExecution() { - final IndicesService indicesService = mock(IndicesService.class); - - final Index index = new Index("index", "uuid"); - final IndexService indexService = mock(IndexService.class); - when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - - final int id = randomIntBetween(0, 4); - final IndexShard indexShard = mock(IndexShard.class); - when(indexService.getShard(id)).thenReturn(indexShard); - - final ShardId shardId = new ShardId(index, id); - when(indexShard.shardId()).thenReturn(shardId); - - final Logger retentionLeaseSyncActionLogger = mock(Logger.class); - - final RetentionLeases retentionLeases = mock(RetentionLeases.class); - final AtomicBoolean invoked = new AtomicBoolean(); - final RetentionLeaseBackgroundSyncAction action = new RetentionLeaseBackgroundSyncAction( - Settings.EMPTY, - transportService, - clusterService, - indicesService, - threadPool, - shardStateAction, - new ActionFilters(Collections.emptySet())) { - - @Override - protected void doExecute(Task task, Request request, ActionListener listener) { - assertTrue(threadPool.getThreadContext().isSystemContext()); - assertThat(request.shardId(), sameInstance(indexShard.shardId())); - assertThat(request.getRetentionLeases(), sameInstance(retentionLeases)); - if (randomBoolean()) { - listener.onResponse(new ReplicationResponse()); - } else { - final Exception e = randomFrom( - new AlreadyClosedException("closed"), - new IndexShardClosedException(indexShard.shardId()), - new TransportException("failed"), - new SendRequestTransportException(null, randomFrom( - "some-action", - "indices:admin/seq_no/retention_lease_background_sync[p]" - ), new NodeClosedException((DiscoveryNode) null)), - new RuntimeException("failed")); - listener.onFailure(e); - if (e.getMessage().equals("failed")) { - final ArgumentCaptor captor = ArgumentCaptor.forClass(ParameterizedMessage.class); - verify(retentionLeaseSyncActionLogger).warn(captor.capture(), same(e)); - final ParameterizedMessage message = captor.getValue(); - assertThat(message.getFormat(), equalTo("{} retention lease background sync failed")); - assertThat(message.getParameters(), arrayContaining(indexShard.shardId())); - } - verifyNoMoreInteractions(retentionLeaseSyncActionLogger); - } - invoked.set(true); - } - - @Override - protected Logger getLogger() { - return retentionLeaseSyncActionLogger; - } - }; - - action.backgroundSync(indexShard.shardId(), retentionLeases); - assertTrue(invoked.get()); - } - public void testBlocks() { final IndicesService indicesService = mock(IndicesService.class); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index 2c2b2adad72..086fc72a471 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -17,9 +17,6 @@ package org.elasticsearch.index.seqno; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; @@ -28,31 +25,23 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.mockito.ArgumentCaptor; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions; import static org.elasticsearch.mock.orig.Mockito.when; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; -import static org.hamcrest.Matchers.arrayContaining; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -126,7 +115,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase { )); } - public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException { + public void testRetentionLeaseSyncActionOnReplica() throws Exception { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -163,68 +152,6 @@ public class RetentionLeaseSyncActionTests extends ESTestCase { assertTrue(success.get()); } - public void testRetentionLeaseSyncExecution() { - final IndicesService indicesService = mock(IndicesService.class); - - final Index index = new Index("index", "uuid"); - final IndexService indexService = mock(IndexService.class); - when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - - final int id = randomIntBetween(0, 4); - final IndexShard indexShard = mock(IndexShard.class); - when(indexService.getShard(id)).thenReturn(indexShard); - - final ShardId shardId = new ShardId(index, id); - when(indexShard.shardId()).thenReturn(shardId); - - final Logger retentionLeaseSyncActionLogger = mock(Logger.class); - - final RetentionLeases retentionLeases = mock(RetentionLeases.class); - final AtomicBoolean invoked = new AtomicBoolean(); - final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction( - Settings.EMPTY, - transportService, - clusterService, - indicesService, - threadPool, - shardStateAction, - new ActionFilters(Collections.emptySet())) { - - @Override - protected void doExecute(Task task, Request request, ActionListener listener) { - assertTrue(threadPool.getThreadContext().isSystemContext()); - assertThat(request.shardId(), sameInstance(indexShard.shardId())); - assertThat(request.getRetentionLeases(), sameInstance(retentionLeases)); - if (randomBoolean()) { - listener.onResponse(new Response()); - } else { - final Exception e = randomFrom( - new AlreadyClosedException("closed"), - new IndexShardClosedException(indexShard.shardId()), - new RuntimeException("failed")); - listener.onFailure(e); - if (e instanceof AlreadyClosedException == false && e instanceof IndexShardClosedException == false) { - final ArgumentCaptor captor = ArgumentCaptor.forClass(ParameterizedMessage.class); - verify(retentionLeaseSyncActionLogger).warn(captor.capture(), same(e)); - final ParameterizedMessage message = captor.getValue(); - assertThat(message.getFormat(), equalTo("{} retention lease sync failed")); - assertThat(message.getParameters(), arrayContaining(indexShard.shardId())); - } - verifyNoMoreInteractions(retentionLeaseSyncActionLogger); - } - invoked.set(true); - } - - @Override - protected Logger getLogger() { - return retentionLeaseSyncActionLogger; - } - }; - - // execution happens on the test thread, so no need to register an actual listener to callback - action.sync(indexShard.shardId(), retentionLeases, ActionListener.wrap(() -> {})); - assertTrue(invoked.get()); - } public void testBlocks() { final IndicesService indicesService = mock(IndicesService.class); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index ff69d956829..7b1e75c62cc 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -144,8 +144,9 @@ import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; -import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; -import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesService; @@ -1137,22 +1138,7 @@ public class SnapshotResiliencyTests extends ESTestCase { threadPool, shardStateAction, actionFilters), - new RetentionLeaseSyncAction( - settings, - transportService, - clusterService, - indicesService, - threadPool, - shardStateAction, - actionFilters), - new RetentionLeaseBackgroundSyncAction( - settings, - transportService, - clusterService, - indicesService, - threadPool, - shardStateAction, - actionFilters)); + RetentionLeaseSyncer.EMPTY); Map actions = new HashMap<>(); final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(settings, clusterService, indicesService, diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 0837f431fff..db3818832f6 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -383,7 +383,7 @@ public class DeterministicTaskQueue { @Override public Runnable preserveContext(Runnable command) { - throw new UnsupportedOperationException(); + return command; } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 74cb4c8ab3c..8d4e43c04e9 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -181,21 +181,15 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } }); - private final RetentionLeaseSyncer retentionLeaseSyncer = new RetentionLeaseSyncer() { - @Override - public void sync(ShardId shardId, RetentionLeases retentionLeases, ActionListener listener) { - syncRetentionLeases(shardId, retentionLeases, listener); - } - - @Override - public void backgroundSync(ShardId shardId, RetentionLeases retentionLeases) { - sync(shardId, retentionLeases, ActionListener.wrap( + private final RetentionLeaseSyncer retentionLeaseSyncer = new RetentionLeaseSyncer( + (shardId, primaryAllocationId, primaryTerm, retentionLeases, listener) -> + syncRetentionLeases(shardId, retentionLeases, listener), + (shardId, primaryAllocationId, primaryTerm, retentionLeases) -> syncRetentionLeases(shardId, retentionLeases, + ActionListener.wrap( r -> { }, e -> { throw new AssertionError("failed to background sync retention lease", e); - })); - } - }; + }))); protected ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); diff --git a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index c9c74e658f4..9f41ae758bf 100644 --- a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -202,7 +202,7 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase { assertOK(client().performRequest(new Request("POST", "/" + forgetFollower + "/_ccr/pause_follow"))); - try (RestClient leaderClient = buildLeaderClient(restClientSettings())) { + try (RestClient leaderClient = buildLeaderClient(restAdminSettings())) { final Request request = new Request("POST", "/" + forgetLeader + "/_ccr/forget_follower"); final String requestBody = "{" + "\"follower_cluster\":\"follow-cluster\"," +