diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java index 1ef9a484a27..58e48f02155 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java @@ -91,7 +91,7 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable { if (logger.isTraceEnabled()) { logger.trace("scheduling {} every {}", toString(), interval); } - cancellable = threadPool.schedule(this, interval, getThreadPool()); + cancellable = threadPool.schedule(threadPool.preserveContext(this), interval, getThreadPool()); isScheduledOrRunning = true; } else { logger.trace("scheduled {} disabled", toString()); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index f4aa145620c..81061c05805 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -838,10 +838,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked"; } - if (primaryMode - && indexSettings.isSoftDeleteEnabled() - && indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN - && hasAllPeerRecoveryRetentionLeases) { + if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) { // all tracked shard copies have a corresponding peer-recovery retention lease for (final ShardRouting shardRouting : routingTable.assignedShards()) { if (checkpoints.get(shardRouting.allocationId().getId()).tracked) { @@ -909,7 +906,9 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; - this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0); + this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) || + (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) && + indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN); this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); this.safeCommitInfoSupplier = safeCommitInfoSupplier; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; @@ -1022,34 +1021,32 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L assert primaryMode; assert Thread.holdsLock(this); - if (indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN) { - final ShardRouting primaryShard = routingTable.primaryShard(); - final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard); - if (retentionLeases.get(leaseId) == null) { - if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) { - assert primaryShard.allocationId().getId().equals(shardAllocationId) - : routingTable.assignedShards() + " vs " + shardAllocationId; - // Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication - // group. - logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId); - innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1), - PEER_RECOVERY_RETENTION_LEASE_SOURCE); - hasAllPeerRecoveryRetentionLeases = true; - } else { - /* - * We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention - * leases for every shard copy, but in this case we do not expect any leases to exist. - */ - assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases; - logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases); - } - } else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting -> - retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) - || checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) { - // Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we - // don't need to do any more work. + final ShardRouting primaryShard = routingTable.primaryShard(); + final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard); + if (retentionLeases.get(leaseId) == null) { + if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) { + assert primaryShard.allocationId().getId().equals(shardAllocationId) + : routingTable.assignedShards() + " vs " + shardAllocationId; + // Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication + // group. + logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId); + innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1), + PEER_RECOVERY_RETENTION_LEASE_SOURCE); hasAllPeerRecoveryRetentionLeases = true; + } else { + /* + * We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention + * leases for every shard copy, but in this case we do not expect any leases to exist. + */ + assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases; + logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases); } + } else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting -> + retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting)) + || checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) { + // Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we + // don't need to do any more work. + hasAllPeerRecoveryRetentionLeases = true; } } @@ -1371,10 +1368,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L * prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases. */ public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener listener) { - if (indexSettings().isSoftDeleteEnabled() - && indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN - && hasAllPeerRecoveryRetentionLeases == false) { - + if (indexSettings().isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases == false) { final List shardRoutings = routingTable.assignedShards(); final GroupedActionListener groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> { setHasAllPeerRecoveryRetentionLeases(); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index 661e095025f..1f6dd80a9f8 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.ReplicationTask; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.service.ClusterService; @@ -36,17 +37,21 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Map; import java.util.Objects; /** @@ -60,8 +65,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi RetentionLeaseBackgroundSyncAction.Request, ReplicationResponse> { - public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync"; - + public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync"; private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class); protected Logger getLogger() { @@ -91,37 +95,50 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi ThreadPool.Names.MANAGEMENT); } - /** - * Background sync the specified retention leases for the specified shard. - * - * @param shardId the shard to sync - * @param retentionLeases the retention leases to sync - */ - public void backgroundSync( - final ShardId shardId, - final RetentionLeases retentionLeases) { - Objects.requireNonNull(shardId); - Objects.requireNonNull(retentionLeases); - final ThreadContext threadContext = threadPool.getThreadContext(); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - // we have to execute under the system context so that if security is enabled the sync is authorized - threadContext.markAsSystemContext(); - execute( - new Request(shardId, retentionLeases), - ActionListener.wrap( - r -> {}, - e -> { - if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) { - // node shutting down - return; - } - if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) { - // the shard is closed - return; - } - getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e); - })); - } + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + assert false : "use RetentionLeaseBackgroundSyncAction#backgroundSync"; + } + + final void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) { + final Request request = new Request(shardId, retentionLeases); + final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_background_sync", request); + transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction, + new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), + task, + transportOptions, + new TransportResponseHandler() { + @Override + public ReplicationResponse read(StreamInput in) throws IOException { + return newResponseInstance(in); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(ReplicationResponse response) { + task.setPhase("finished"); + taskManager.unregister(task); + } + + @Override + public void handleException(TransportException e) { + task.setPhase("finished"); + taskManager.unregister(task); + if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) { + // node shutting down + return; + } + if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) { + // the shard is closed + return; + } + getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e); + } + }); } @Override @@ -171,6 +188,11 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi retentionLeases.writeTo(out); } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ReplicationTask(id, type, action, "retention_lease_background_sync shardId=" + shardId, parentTaskId, headers); + } + @Override public String toString() { return "RetentionLeaseBackgroundSyncAction.Request{" + diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 01f1023e490..44da3318698 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.ReplicationTask; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -38,16 +39,20 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Map; import java.util.Objects; /** @@ -57,8 +62,7 @@ import java.util.Objects; public class RetentionLeaseSyncAction extends TransportWriteAction { - public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync"; - + public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync"; private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class); protected Logger getLogger() { @@ -88,35 +92,47 @@ public class RetentionLeaseSyncAction extends ThreadPool.Names.MANAGEMENT, false); } - /** - * Sync the specified retention leases for the specified shard. The callback is invoked when the sync succeeds or fails. - * - * @param shardId the shard to sync - * @param retentionLeases the retention leases to sync - * @param listener the callback to invoke when the sync completes normally or abnormally - */ - public void sync( - final ShardId shardId, - final RetentionLeases retentionLeases, - final ActionListener listener) { - Objects.requireNonNull(shardId); - Objects.requireNonNull(retentionLeases); - Objects.requireNonNull(listener); - final ThreadContext threadContext = threadPool.getThreadContext(); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - // we have to execute under the system context so that if security is enabled the sync is authorized - threadContext.markAsSystemContext(); - execute( - new RetentionLeaseSyncAction.Request(shardId, retentionLeases), - ActionListener.wrap( - listener::onResponse, - e -> { - if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { - getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e); - } - listener.onFailure(e); - })); - } + @Override + protected void doExecute(Task parentTask, Request request, ActionListener listener) { + assert false : "use RetentionLeaseSyncAction#sync"; + } + + final void sync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases, + ActionListener listener) { + final Request request = new Request(shardId, retentionLeases); + final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request); + transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction, + new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), + task, + transportOptions, + new TransportResponseHandler() { + @Override + public ReplicationResponse read(StreamInput in) throws IOException { + return newResponseInstance(in); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(ReplicationResponse response) { + task.setPhase("finished"); + taskManager.unregister(task); + listener.onResponse(response); + } + + @Override + public void handleException(TransportException e) { + if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { + getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e); + } + task.setPhase("finished"); + taskManager.unregister(task); + listener.onFailure(e); + } + }); } @Override @@ -172,6 +188,11 @@ public class RetentionLeaseSyncAction extends retentionLeases.writeTo(out); } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new ReplicationTask(id, type, action, "retention_lease_sync shardId=" + shardId, parentTaskId, headers); + } + @Override public String toString() { return "RetentionLeaseSyncAction.Request{" + diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java index 7de6bad3f11..40f80fee2b0 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java @@ -21,36 +21,52 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.shard.ShardId; -/** - * A functional interface that represents a method for syncing retention leases to replica shards after a new retention lease is added on - * the primary. - */ -public interface RetentionLeaseSyncer { +import java.util.Objects; + +public class RetentionLeaseSyncer { + private final SyncAction syncAction; + private final BackgroundSyncAction backgroundSyncAction; + + @Inject + public RetentionLeaseSyncer(RetentionLeaseSyncAction syncAction, RetentionLeaseBackgroundSyncAction backgroundSyncAction) { + this(syncAction::sync, backgroundSyncAction::backgroundSync); + } + + public RetentionLeaseSyncer(SyncAction syncAction, BackgroundSyncAction backgroundSyncAction) { + this.syncAction = Objects.requireNonNull(syncAction); + this.backgroundSyncAction = Objects.requireNonNull(backgroundSyncAction); + } + + public static final RetentionLeaseSyncer EMPTY = new RetentionLeaseSyncer( + (shardId, primaryAllocationId, primaryTerm, retentionLeases, listener) -> listener.onResponse(new ReplicationResponse()), + (shardId, primaryAllocationId, primaryTerm, retentionLeases) -> { }); + + public void sync(ShardId shardId, String primaryAllocationId, long primaryTerm, + RetentionLeases retentionLeases, ActionListener listener) { + syncAction.sync(shardId, primaryAllocationId, primaryTerm, retentionLeases, listener); + } + + public void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) { + backgroundSyncAction.backgroundSync(shardId, primaryAllocationId, primaryTerm, retentionLeases); + } /** - * Represents a method that when invoked syncs retention leases to replica shards after a new retention lease is added on the primary. - * The specified listener is invoked when the syncing completes with success or failure. - * - * @param shardId the shard ID - * @param retentionLeases the retention leases to sync - * @param listener the callback when sync completes + * Represents an action that is invoked to sync retention leases to replica shards after a retention lease is added + * or removed on the primary. The specified listener is invoked when the syncing completes with success or failure. */ - void sync(ShardId shardId, RetentionLeases retentionLeases, ActionListener listener); - - void backgroundSync(ShardId shardId, RetentionLeases retentionLeases); - - RetentionLeaseSyncer EMPTY = new RetentionLeaseSyncer() { - @Override - public void sync(final ShardId shardId, final RetentionLeases retentionLeases, final ActionListener listener) { - listener.onResponse(new ReplicationResponse()); - } - - @Override - public void backgroundSync(final ShardId shardId, final RetentionLeases retentionLeases) { - - } - }; + public interface SyncAction { + void sync(ShardId shardId, String primaryAllocationId, long primaryTerm, + RetentionLeases retentionLeases, ActionListener listener); + } + /** + * Represents an action that is invoked periodically to sync retention leases to replica shards after some retention + * lease has been renewed or expired. + */ + public interface BackgroundSyncAction { + void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases); + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7b6079bcb84..c86040d1108 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -342,7 +342,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated, threadPool::absoluteTimeInMillis, - (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener), + (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, aId, getPendingPrimaryTerm(), retentionLeases, listener), this::getSafeCommitInfo); // the query cache is a node-level thing, however we want the most popular filters @@ -2216,6 +2216,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2()); retentionLeaseSyncer.sync( shardId, + shardRouting.allocationId().getId(), + getPendingPrimaryTerm(), retentionLeases.v2(), ActionListener.wrap( r -> {}, @@ -2225,7 +2227,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl e))); } else { logger.trace("background syncing retention leases [{}] after expiration check", retentionLeases.v2()); - retentionLeaseSyncer.backgroundSync(shardId, retentionLeases.v2()); + retentionLeaseSyncer.backgroundSync( + shardId, shardRouting.allocationId().getId(), getPendingPrimaryTerm(), retentionLeases.v2()); } } diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java index b34bf7e89a6..068569c68cf 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -57,6 +57,9 @@ import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.TextFieldMapper; import org.elasticsearch.index.mapper.TypeFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; +import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.indices.cluster.IndicesClusterStateService; @@ -249,6 +252,9 @@ public class IndicesModule extends AbstractModule { bind(GlobalCheckpointSyncAction.class).asEagerSingleton(); bind(TransportResyncReplicationAction.class).asEagerSingleton(); bind(PrimaryReplicaSyncer.class).asEagerSingleton(); + bind(RetentionLeaseSyncAction.class).asEagerSingleton(); + bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton(); + bind(RetentionLeaseSyncer.class).asEagerSingleton(); } /** diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 9dac970a317..939658fddb6 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -24,7 +24,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; @@ -54,10 +53,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.ReplicationTracker; -import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; -import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncer; -import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRelocatedException; @@ -140,11 +136,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple final SnapshotShardsService snapshotShardsService, final PrimaryReplicaSyncer primaryReplicaSyncer, final GlobalCheckpointSyncAction globalCheckpointSyncAction, - final RetentionLeaseSyncAction retentionLeaseSyncAction, - final RetentionLeaseBackgroundSyncAction retentionLeaseBackgroundSyncAction) { + final RetentionLeaseSyncer retentionLeaseSyncer) { this( settings, - (AllocatedIndices>) indicesService, + indicesService, clusterService, threadPool, recoveryTargetService, @@ -157,20 +152,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple snapshotShardsService, primaryReplicaSyncer, globalCheckpointSyncAction::updateGlobalCheckpointForShard, - new RetentionLeaseSyncer() { - @Override - public void sync( - final ShardId shardId, - final RetentionLeases retentionLeases, - final ActionListener listener) { - Objects.requireNonNull(retentionLeaseSyncAction).sync(shardId, retentionLeases, listener); - } - - @Override - public void backgroundSync(final ShardId shardId, final RetentionLeases retentionLeases) { - Objects.requireNonNull(retentionLeaseBackgroundSyncAction).backgroundSync(shardId, retentionLeases); - } - }); + retentionLeaseSyncer); } // for tests diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 34449d70671..56b48462b94 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -38,7 +38,6 @@ import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.replication.ReplicationResponse; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.CheckedSupplier; @@ -156,8 +155,7 @@ public class RecoverySourceHandler { IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e)); }; - final boolean useRetentionLeases = shard.indexSettings().isSoftDeleteEnabled() - && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE; + final boolean softDeletesEnabled = shard.indexSettings().isSoftDeleteEnabled(); final SetOnce retentionLeaseRef = new SetOnce<>(); runUnderPrimaryPermit(() -> { @@ -169,7 +167,7 @@ public class RecoverySourceHandler { throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - retentionLeaseRef.set(useRetentionLeases ? shard.getRetentionLeases().get( + retentionLeaseRef.set(softDeletesEnabled ? shard.getRetentionLeases().get( ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null); }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); @@ -180,7 +178,7 @@ public class RecoverySourceHandler { = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()) - && (useRetentionLeases == false + && (softDeletesEnabled == false || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); // NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease, // because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's @@ -188,7 +186,7 @@ public class RecoverySourceHandler { // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery // without having a complete history. - if (isSequenceNumberBasedRecovery && useRetentionLeases) { + if (isSequenceNumberBasedRecovery && softDeletesEnabled) { // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock retentionLock.close(); logger.trace("history is retained by {}", retentionLeaseRef.get()); @@ -227,7 +225,7 @@ public class RecoverySourceHandler { // advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can // always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled // down. - startingSeqNo = useRetentionLeases + startingSeqNo = softDeletesEnabled ? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L : 0; logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); @@ -245,7 +243,7 @@ public class RecoverySourceHandler { }); final StepListener deleteRetentionLeaseStep = new StepListener<>(); - if (useRetentionLeases) { + if (softDeletesEnabled) { runUnderPrimaryPermit(() -> { try { // If the target previously had a copy of this shard then a file-based recovery might move its global @@ -268,7 +266,7 @@ public class RecoverySourceHandler { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); final Consumer> createRetentionLeaseAsync; - if (useRetentionLeases) { + if (softDeletesEnabled) { createRetentionLeaseAsync = l -> createRetentionLease(startingSeqNo, l); } else { createRetentionLeaseAsync = l -> l.onResponse(null); @@ -306,7 +304,7 @@ public class RecoverySourceHandler { final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); resources.add(phase2Snapshot); - if (useRetentionLeases == false || isSequenceNumberBasedRecovery == false) { + if (softDeletesEnabled == false || isSequenceNumberBasedRecovery == false) { // we can release the retention lock here because the snapshot itself will retain the required operations. retentionLock.close(); } diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java index 63f4e225a16..3c31ad185d8 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -323,6 +323,40 @@ public class ReplicaShardAllocatorIT extends ESIntegTestCase { transportService.clearAllRules(); } + public void testPeerRecoveryForClosedIndices() throws Exception { + String indexName = "peer_recovery_closed_indices"; + internalCluster().ensureAtLeastNumDataNodes(1); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 100)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(Collectors.toList())); + ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName); + assertAcked(client().admin().indices().prepareClose(indexName)); + int numberOfReplicas = randomIntBetween(1, 2); + internalCluster().ensureAtLeastNumDataNodes(2 + numberOfReplicas); + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas))); + ensureGreen(indexName); + ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName); + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put("cluster.routing.allocation.enable", "primaries").build())); + internalCluster().fullRestart(); + ensureYellow(indexName); + if (randomBoolean()) { + assertAcked(client().admin().indices().prepareOpen(indexName)); + client().admin().indices().prepareForceMerge(indexName).get(); + } + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("cluster.routing.allocation.enable").build())); + ensureGreen(indexName); + assertNoOpRecoveries(indexName); + } + private void ensureActivePeerRecoveryRetentionLeasesAdvanced(String indexName) throws Exception { assertBusy(() -> { Index index = resolveIndex(indexName); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index 6d9457193c4..faccbc7ff48 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -17,17 +17,12 @@ package org.elasticsearch.index.seqno; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; -import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.internal.io.IOUtils; @@ -35,31 +30,21 @@ import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.node.NodeClosedException; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.SendRequestTransportException; -import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportService; -import org.mockito.ArgumentCaptor; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions; import static org.elasticsearch.mock.orig.Mockito.when; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; -import static org.hamcrest.Matchers.arrayContaining; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -173,73 +158,6 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase { assertTrue(success.get()); } - public void testRetentionLeaseSyncExecution() { - final IndicesService indicesService = mock(IndicesService.class); - - final Index index = new Index("index", "uuid"); - final IndexService indexService = mock(IndexService.class); - when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - - final int id = randomIntBetween(0, 4); - final IndexShard indexShard = mock(IndexShard.class); - when(indexService.getShard(id)).thenReturn(indexShard); - - final ShardId shardId = new ShardId(index, id); - when(indexShard.shardId()).thenReturn(shardId); - - final Logger retentionLeaseSyncActionLogger = mock(Logger.class); - - final RetentionLeases retentionLeases = mock(RetentionLeases.class); - final AtomicBoolean invoked = new AtomicBoolean(); - final RetentionLeaseBackgroundSyncAction action = new RetentionLeaseBackgroundSyncAction( - Settings.EMPTY, - transportService, - clusterService, - indicesService, - threadPool, - shardStateAction, - new ActionFilters(Collections.emptySet())) { - - @Override - protected void doExecute(Task task, Request request, ActionListener listener) { - assertTrue(threadPool.getThreadContext().isSystemContext()); - assertThat(request.shardId(), sameInstance(indexShard.shardId())); - assertThat(request.getRetentionLeases(), sameInstance(retentionLeases)); - if (randomBoolean()) { - listener.onResponse(new ReplicationResponse()); - } else { - final Exception e = randomFrom( - new AlreadyClosedException("closed"), - new IndexShardClosedException(indexShard.shardId()), - new TransportException("failed"), - new SendRequestTransportException(null, randomFrom( - "some-action", - "indices:admin/seq_no/retention_lease_background_sync[p]" - ), new NodeClosedException((DiscoveryNode) null)), - new RuntimeException("failed")); - listener.onFailure(e); - if (e.getMessage().equals("failed")) { - final ArgumentCaptor captor = ArgumentCaptor.forClass(ParameterizedMessage.class); - verify(retentionLeaseSyncActionLogger).warn(captor.capture(), same(e)); - final ParameterizedMessage message = captor.getValue(); - assertThat(message.getFormat(), equalTo("{} retention lease background sync failed")); - assertThat(message.getParameters(), arrayContaining(indexShard.shardId())); - } - verifyNoMoreInteractions(retentionLeaseSyncActionLogger); - } - invoked.set(true); - } - - @Override - protected Logger getLogger() { - return retentionLeaseSyncActionLogger; - } - }; - - action.backgroundSync(indexShard.shardId(), retentionLeases); - assertTrue(invoked.get()); - } - public void testBlocks() { final IndicesService indicesService = mock(IndicesService.class); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index 2c2b2adad72..086fc72a471 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -17,9 +17,6 @@ package org.elasticsearch.index.seqno; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; @@ -28,31 +25,23 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.mockito.ArgumentCaptor; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions; import static org.elasticsearch.mock.orig.Mockito.when; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; -import static org.hamcrest.Matchers.arrayContaining; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -126,7 +115,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase { )); } - public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException { + public void testRetentionLeaseSyncActionOnReplica() throws Exception { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -163,68 +152,6 @@ public class RetentionLeaseSyncActionTests extends ESTestCase { assertTrue(success.get()); } - public void testRetentionLeaseSyncExecution() { - final IndicesService indicesService = mock(IndicesService.class); - - final Index index = new Index("index", "uuid"); - final IndexService indexService = mock(IndexService.class); - when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - - final int id = randomIntBetween(0, 4); - final IndexShard indexShard = mock(IndexShard.class); - when(indexService.getShard(id)).thenReturn(indexShard); - - final ShardId shardId = new ShardId(index, id); - when(indexShard.shardId()).thenReturn(shardId); - - final Logger retentionLeaseSyncActionLogger = mock(Logger.class); - - final RetentionLeases retentionLeases = mock(RetentionLeases.class); - final AtomicBoolean invoked = new AtomicBoolean(); - final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction( - Settings.EMPTY, - transportService, - clusterService, - indicesService, - threadPool, - shardStateAction, - new ActionFilters(Collections.emptySet())) { - - @Override - protected void doExecute(Task task, Request request, ActionListener listener) { - assertTrue(threadPool.getThreadContext().isSystemContext()); - assertThat(request.shardId(), sameInstance(indexShard.shardId())); - assertThat(request.getRetentionLeases(), sameInstance(retentionLeases)); - if (randomBoolean()) { - listener.onResponse(new Response()); - } else { - final Exception e = randomFrom( - new AlreadyClosedException("closed"), - new IndexShardClosedException(indexShard.shardId()), - new RuntimeException("failed")); - listener.onFailure(e); - if (e instanceof AlreadyClosedException == false && e instanceof IndexShardClosedException == false) { - final ArgumentCaptor captor = ArgumentCaptor.forClass(ParameterizedMessage.class); - verify(retentionLeaseSyncActionLogger).warn(captor.capture(), same(e)); - final ParameterizedMessage message = captor.getValue(); - assertThat(message.getFormat(), equalTo("{} retention lease sync failed")); - assertThat(message.getParameters(), arrayContaining(indexShard.shardId())); - } - verifyNoMoreInteractions(retentionLeaseSyncActionLogger); - } - invoked.set(true); - } - - @Override - protected Logger getLogger() { - return retentionLeaseSyncActionLogger; - } - }; - - // execution happens on the test thread, so no need to register an actual listener to callback - action.sync(indexShard.shardId(), retentionLeases, ActionListener.wrap(() -> {})); - assertTrue(invoked.get()); - } public void testBlocks() { final IndicesService indicesService = mock(IndicesService.class); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index ff69d956829..7b1e75c62cc 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -144,8 +144,9 @@ import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; -import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; -import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesService; @@ -1137,22 +1138,7 @@ public class SnapshotResiliencyTests extends ESTestCase { threadPool, shardStateAction, actionFilters), - new RetentionLeaseSyncAction( - settings, - transportService, - clusterService, - indicesService, - threadPool, - shardStateAction, - actionFilters), - new RetentionLeaseBackgroundSyncAction( - settings, - transportService, - clusterService, - indicesService, - threadPool, - shardStateAction, - actionFilters)); + RetentionLeaseSyncer.EMPTY); Map actions = new HashMap<>(); final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(settings, clusterService, indicesService, diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 0837f431fff..db3818832f6 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -383,7 +383,7 @@ public class DeterministicTaskQueue { @Override public Runnable preserveContext(Runnable command) { - throw new UnsupportedOperationException(); + return command; } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 74cb4c8ab3c..8d4e43c04e9 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -181,21 +181,15 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase } }); - private final RetentionLeaseSyncer retentionLeaseSyncer = new RetentionLeaseSyncer() { - @Override - public void sync(ShardId shardId, RetentionLeases retentionLeases, ActionListener listener) { - syncRetentionLeases(shardId, retentionLeases, listener); - } - - @Override - public void backgroundSync(ShardId shardId, RetentionLeases retentionLeases) { - sync(shardId, retentionLeases, ActionListener.wrap( + private final RetentionLeaseSyncer retentionLeaseSyncer = new RetentionLeaseSyncer( + (shardId, primaryAllocationId, primaryTerm, retentionLeases, listener) -> + syncRetentionLeases(shardId, retentionLeases, listener), + (shardId, primaryAllocationId, primaryTerm, retentionLeases) -> syncRetentionLeases(shardId, retentionLeases, + ActionListener.wrap( r -> { }, e -> { throw new AssertionError("failed to background sync retention lease", e); - })); - } - }; + }))); protected ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); diff --git a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index c9c74e658f4..9f41ae758bf 100644 --- a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -202,7 +202,7 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase { assertOK(client().performRequest(new Request("POST", "/" + forgetFollower + "/_ccr/pause_follow"))); - try (RestClient leaderClient = buildLeaderClient(restClientSettings())) { + try (RestClient leaderClient = buildLeaderClient(restAdminSettings())) { final Request request = new Request("POST", "/" + forgetLeader + "/_ccr/forget_follower"); final String requestBody = "{" + "\"follower_cluster\":\"follow-cluster\"," +