Use retention lease in peer recovery of closed indices (#48430)

Today we do not use retention leases in peer recovery for closed indices
because we can't sync retention leases on closed indices. This change
allows that ability and adjusts peer recovery to use retention leases
for all indices with soft-deletes enabled.

Relates #45136

Co-authored-by: David Turner <david.turner@elastic.co>
This commit is contained in:
Nhat Nguyen 2019-11-21 14:57:38 -05:00
parent bbb872a022
commit c151a75dfe
16 changed files with 250 additions and 349 deletions

View File

@ -91,7 +91,7 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("scheduling {} every {}", toString(), interval); logger.trace("scheduling {} every {}", toString(), interval);
} }
cancellable = threadPool.schedule(this, interval, getThreadPool()); cancellable = threadPool.schedule(threadPool.preserveContext(this), interval, getThreadPool());
isScheduledOrRunning = true; isScheduledOrRunning = true;
} else { } else {
logger.trace("scheduled {} disabled", toString()); logger.trace("scheduled {} disabled", toString());

View File

@ -838,10 +838,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked"; assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked";
} }
if (primaryMode if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) {
&& indexSettings.isSoftDeleteEnabled()
&& indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN
&& hasAllPeerRecoveryRetentionLeases) {
// all tracked shard copies have a corresponding peer-recovery retention lease // all tracked shard copies have a corresponding peer-recovery retention lease
for (final ShardRouting shardRouting : routingTable.assignedShards()) { for (final ShardRouting shardRouting : routingTable.assignedShards()) {
if (checkpoints.get(shardRouting.allocationId().getId()).tracked) { if (checkpoints.get(shardRouting.allocationId().getId()).tracked) {
@ -909,7 +906,9 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
this.pendingInSync = new HashSet<>(); this.pendingInSync = new HashSet<>();
this.routingTable = null; this.routingTable = null;
this.replicationGroup = null; this.replicationGroup = null;
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0); this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) ||
(indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) &&
indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN);
this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
this.safeCommitInfoSupplier = safeCommitInfoSupplier; this.safeCommitInfoSupplier = safeCommitInfoSupplier;
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
@ -1022,7 +1021,6 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
assert primaryMode; assert primaryMode;
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
if (indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN) {
final ShardRouting primaryShard = routingTable.primaryShard(); final ShardRouting primaryShard = routingTable.primaryShard();
final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard); final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard);
if (retentionLeases.get(leaseId) == null) { if (retentionLeases.get(leaseId) == null) {
@ -1051,7 +1049,6 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
hasAllPeerRecoveryRetentionLeases = true; hasAllPeerRecoveryRetentionLeases = true;
} }
} }
}
/** /**
* Notifies the tracker of the current allocation IDs in the cluster state. * Notifies the tracker of the current allocation IDs in the cluster state.
@ -1371,10 +1368,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
* prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases. * prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases.
*/ */
public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) { public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) {
if (indexSettings().isSoftDeleteEnabled() if (indexSettings().isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases == false) {
&& indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN
&& hasAllPeerRecoveryRetentionLeases == false) {
final List<ShardRouting> shardRoutings = routingTable.assignedShards(); final List<ShardRouting> shardRoutings = routingTable.assignedShards();
final GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> { final GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> {
setHasAllPeerRecoveryRetentionLeases(); setHasAllPeerRecoveryRetentionLeases();

View File

@ -29,6 +29,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
@ -36,17 +37,21 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
/** /**
@ -60,8 +65,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
RetentionLeaseBackgroundSyncAction.Request, RetentionLeaseBackgroundSyncAction.Request,
ReplicationResponse> { ReplicationResponse> {
public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync"; public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync";
private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class); private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class);
protected Logger getLogger() { protected Logger getLogger() {
@ -91,26 +95,39 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
ThreadPool.Names.MANAGEMENT); ThreadPool.Names.MANAGEMENT);
} }
/** @Override
* Background sync the specified retention leases for the specified shard. protected void doExecute(Task task, Request request, ActionListener<ReplicationResponse> listener) {
* assert false : "use RetentionLeaseBackgroundSyncAction#backgroundSync";
* @param shardId the shard to sync }
* @param retentionLeases the retention leases to sync
*/ final void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) {
public void backgroundSync( final Request request = new Request(shardId, retentionLeases);
final ShardId shardId, final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_background_sync", request);
final RetentionLeases retentionLeases) { transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction,
Objects.requireNonNull(shardId); new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
Objects.requireNonNull(retentionLeases); task,
final ThreadContext threadContext = threadPool.getThreadContext(); transportOptions,
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { new TransportResponseHandler<ReplicationResponse>() {
// we have to execute under the system context so that if security is enabled the sync is authorized @Override
threadContext.markAsSystemContext(); public ReplicationResponse read(StreamInput in) throws IOException {
execute( return newResponseInstance(in);
new Request(shardId, retentionLeases), }
ActionListener.wrap(
r -> {}, @Override
e -> { public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(ReplicationResponse response) {
task.setPhase("finished");
taskManager.unregister(task);
}
@Override
public void handleException(TransportException e) {
task.setPhase("finished");
taskManager.unregister(task);
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) { if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
// node shutting down // node shutting down
return; return;
@ -120,8 +137,8 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
return; return;
} }
getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e); getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e);
}));
} }
});
} }
@Override @Override
@ -171,6 +188,11 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
retentionLeases.writeTo(out); retentionLeases.writeTo(out);
} }
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ReplicationTask(id, type, action, "retention_lease_background_sync shardId=" + shardId, parentTaskId, headers);
}
@Override @Override
public String toString() { public String toString() {
return "RetentionLeaseBackgroundSyncAction.Request{" + return "RetentionLeaseBackgroundSyncAction.Request{" +

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -38,16 +39,20 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
/** /**
@ -57,8 +62,7 @@ import java.util.Objects;
public class RetentionLeaseSyncAction extends public class RetentionLeaseSyncAction extends
TransportWriteAction<RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Response> { TransportWriteAction<RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Response> {
public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync"; public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync";
private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class); private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class);
protected Logger getLogger() { protected Logger getLogger() {
@ -88,35 +92,47 @@ public class RetentionLeaseSyncAction extends
ThreadPool.Names.MANAGEMENT, false); ThreadPool.Names.MANAGEMENT, false);
} }
/** @Override
* Sync the specified retention leases for the specified shard. The callback is invoked when the sync succeeds or fails. protected void doExecute(Task parentTask, Request request, ActionListener<Response> listener) {
* assert false : "use RetentionLeaseSyncAction#sync";
* @param shardId the shard to sync }
* @param retentionLeases the retention leases to sync
* @param listener the callback to invoke when the sync completes normally or abnormally final void sync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases,
*/ ActionListener<ReplicationResponse> listener) {
public void sync( final Request request = new Request(shardId, retentionLeases);
final ShardId shardId, final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request);
final RetentionLeases retentionLeases, transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction,
final ActionListener<ReplicationResponse> listener) { new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
Objects.requireNonNull(shardId); task,
Objects.requireNonNull(retentionLeases); transportOptions,
Objects.requireNonNull(listener); new TransportResponseHandler<ReplicationResponse>() {
final ThreadContext threadContext = threadPool.getThreadContext(); @Override
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { public ReplicationResponse read(StreamInput in) throws IOException {
// we have to execute under the system context so that if security is enabled the sync is authorized return newResponseInstance(in);
threadContext.markAsSystemContext(); }
execute(
new RetentionLeaseSyncAction.Request(shardId, retentionLeases), @Override
ActionListener.wrap( public String executor() {
listener::onResponse, return ThreadPool.Names.SAME;
e -> { }
@Override
public void handleResponse(ReplicationResponse response) {
task.setPhase("finished");
taskManager.unregister(task);
listener.onResponse(response);
}
@Override
public void handleException(TransportException e) {
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e); getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e);
} }
task.setPhase("finished");
taskManager.unregister(task);
listener.onFailure(e); listener.onFailure(e);
}));
} }
});
} }
@Override @Override
@ -172,6 +188,11 @@ public class RetentionLeaseSyncAction extends
retentionLeases.writeTo(out); retentionLeases.writeTo(out);
} }
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ReplicationTask(id, type, action, "retention_lease_sync shardId=" + shardId, parentTaskId, headers);
}
@Override @Override
public String toString() { public String toString() {
return "RetentionLeaseSyncAction.Request{" + return "RetentionLeaseSyncAction.Request{" +

View File

@ -21,36 +21,52 @@ package org.elasticsearch.index.seqno;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
/** import java.util.Objects;
* A functional interface that represents a method for syncing retention leases to replica shards after a new retention lease is added on
* the primary. public class RetentionLeaseSyncer {
*/ private final SyncAction syncAction;
public interface RetentionLeaseSyncer { private final BackgroundSyncAction backgroundSyncAction;
@Inject
public RetentionLeaseSyncer(RetentionLeaseSyncAction syncAction, RetentionLeaseBackgroundSyncAction backgroundSyncAction) {
this(syncAction::sync, backgroundSyncAction::backgroundSync);
}
public RetentionLeaseSyncer(SyncAction syncAction, BackgroundSyncAction backgroundSyncAction) {
this.syncAction = Objects.requireNonNull(syncAction);
this.backgroundSyncAction = Objects.requireNonNull(backgroundSyncAction);
}
public static final RetentionLeaseSyncer EMPTY = new RetentionLeaseSyncer(
(shardId, primaryAllocationId, primaryTerm, retentionLeases, listener) -> listener.onResponse(new ReplicationResponse()),
(shardId, primaryAllocationId, primaryTerm, retentionLeases) -> { });
public void sync(ShardId shardId, String primaryAllocationId, long primaryTerm,
RetentionLeases retentionLeases, ActionListener<ReplicationResponse> listener) {
syncAction.sync(shardId, primaryAllocationId, primaryTerm, retentionLeases, listener);
}
public void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) {
backgroundSyncAction.backgroundSync(shardId, primaryAllocationId, primaryTerm, retentionLeases);
}
/** /**
* Represents a method that when invoked syncs retention leases to replica shards after a new retention lease is added on the primary. * Represents an action that is invoked to sync retention leases to replica shards after a retention lease is added
* The specified listener is invoked when the syncing completes with success or failure. * or removed on the primary. The specified listener is invoked when the syncing completes with success or failure.
*
* @param shardId the shard ID
* @param retentionLeases the retention leases to sync
* @param listener the callback when sync completes
*/ */
void sync(ShardId shardId, RetentionLeases retentionLeases, ActionListener<ReplicationResponse> listener); public interface SyncAction {
void sync(ShardId shardId, String primaryAllocationId, long primaryTerm,
void backgroundSync(ShardId shardId, RetentionLeases retentionLeases); RetentionLeases retentionLeases, ActionListener<ReplicationResponse> listener);
RetentionLeaseSyncer EMPTY = new RetentionLeaseSyncer() {
@Override
public void sync(final ShardId shardId, final RetentionLeases retentionLeases, final ActionListener<ReplicationResponse> listener) {
listener.onResponse(new ReplicationResponse());
} }
@Override /**
public void backgroundSync(final ShardId shardId, final RetentionLeases retentionLeases) { * Represents an action that is invoked periodically to sync retention leases to replica shards after some retention
* lease has been renewed or expired.
*/
public interface BackgroundSyncAction {
void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases);
} }
};
} }

View File

@ -342,7 +342,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
UNASSIGNED_SEQ_NO, UNASSIGNED_SEQ_NO,
globalCheckpointListeners::globalCheckpointUpdated, globalCheckpointListeners::globalCheckpointUpdated,
threadPool::absoluteTimeInMillis, threadPool::absoluteTimeInMillis,
(retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener), (retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, aId, getPendingPrimaryTerm(), retentionLeases, listener),
this::getSafeCommitInfo); this::getSafeCommitInfo);
// the query cache is a node-level thing, however we want the most popular filters // the query cache is a node-level thing, however we want the most popular filters
@ -2216,6 +2216,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2()); logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2());
retentionLeaseSyncer.sync( retentionLeaseSyncer.sync(
shardId, shardId,
shardRouting.allocationId().getId(),
getPendingPrimaryTerm(),
retentionLeases.v2(), retentionLeases.v2(),
ActionListener.wrap( ActionListener.wrap(
r -> {}, r -> {},
@ -2225,7 +2227,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
e))); e)));
} else { } else {
logger.trace("background syncing retention leases [{}] after expiration check", retentionLeases.v2()); logger.trace("background syncing retention leases [{}] after expiration check", retentionLeases.v2());
retentionLeaseSyncer.backgroundSync(shardId, retentionLeases.v2()); retentionLeaseSyncer.backgroundSync(
shardId, shardRouting.allocationId().getId(), getPendingPrimaryTerm(), retentionLeases.v2());
} }
} }

View File

@ -57,6 +57,9 @@ import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.TextFieldMapper; import org.elasticsearch.index.mapper.TextFieldMapper;
import org.elasticsearch.index.mapper.TypeFieldMapper; import org.elasticsearch.index.mapper.TypeFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.cluster.IndicesClusterStateService;
@ -249,6 +252,9 @@ public class IndicesModule extends AbstractModule {
bind(GlobalCheckpointSyncAction.class).asEagerSingleton(); bind(GlobalCheckpointSyncAction.class).asEagerSingleton();
bind(TransportResyncReplicationAction.class).asEagerSingleton(); bind(TransportResyncReplicationAction.class).asEagerSingleton();
bind(PrimaryReplicaSyncer.class).asEagerSingleton(); bind(PrimaryReplicaSyncer.class).asEagerSingleton();
bind(RetentionLeaseSyncAction.class).asEagerSingleton();
bind(RetentionLeaseBackgroundSyncAction.class).asEagerSingleton();
bind(RetentionLeaseSyncer.class).asEagerSingleton();
} }
/** /**

View File

@ -24,7 +24,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateApplier;
@ -54,10 +53,7 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRelocatedException; import org.elasticsearch.index.shard.IndexShardRelocatedException;
@ -140,11 +136,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
final SnapshotShardsService snapshotShardsService, final SnapshotShardsService snapshotShardsService,
final PrimaryReplicaSyncer primaryReplicaSyncer, final PrimaryReplicaSyncer primaryReplicaSyncer,
final GlobalCheckpointSyncAction globalCheckpointSyncAction, final GlobalCheckpointSyncAction globalCheckpointSyncAction,
final RetentionLeaseSyncAction retentionLeaseSyncAction, final RetentionLeaseSyncer retentionLeaseSyncer) {
final RetentionLeaseBackgroundSyncAction retentionLeaseBackgroundSyncAction) {
this( this(
settings, settings,
(AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService, indicesService,
clusterService, clusterService,
threadPool, threadPool,
recoveryTargetService, recoveryTargetService,
@ -157,20 +152,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
snapshotShardsService, snapshotShardsService,
primaryReplicaSyncer, primaryReplicaSyncer,
globalCheckpointSyncAction::updateGlobalCheckpointForShard, globalCheckpointSyncAction::updateGlobalCheckpointForShard,
new RetentionLeaseSyncer() { retentionLeaseSyncer);
@Override
public void sync(
final ShardId shardId,
final RetentionLeases retentionLeases,
final ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(retentionLeaseSyncAction).sync(shardId, retentionLeases, listener);
}
@Override
public void backgroundSync(final ShardId shardId, final RetentionLeases retentionLeases) {
Objects.requireNonNull(retentionLeaseBackgroundSyncAction).backgroundSync(shardId, retentionLeases);
}
});
} }
// for tests // for tests

View File

@ -38,7 +38,6 @@ import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.CheckedSupplier;
@ -156,8 +155,7 @@ public class RecoverySourceHandler {
IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e)); IOUtils.closeWhileHandlingException(releaseResources, () -> wrappedListener.onFailure(e));
}; };
final boolean useRetentionLeases = shard.indexSettings().isSoftDeleteEnabled() final boolean softDeletesEnabled = shard.indexSettings().isSoftDeleteEnabled();
&& shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE;
final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>(); final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();
runUnderPrimaryPermit(() -> { runUnderPrimaryPermit(() -> {
@ -169,7 +167,7 @@ public class RecoverySourceHandler {
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
} }
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
retentionLeaseRef.set(useRetentionLeases ? shard.getRetentionLeases().get( retentionLeaseRef.set(softDeletesEnabled ? shard.getRetentionLeases().get(
ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null); ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null);
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",
shard, cancellableThreads, logger); shard, cancellableThreads, logger);
@ -180,7 +178,7 @@ public class RecoverySourceHandler {
= request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
&& isTargetSameHistory() && isTargetSameHistory()
&& shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo()) && shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo())
&& (useRetentionLeases == false && (softDeletesEnabled == false
|| (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));
// NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease, // NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease,
// because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's // because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's
@ -188,7 +186,7 @@ public class RecoverySourceHandler {
// Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery
// without having a complete history. // without having a complete history.
if (isSequenceNumberBasedRecovery && useRetentionLeases) { if (isSequenceNumberBasedRecovery && softDeletesEnabled) {
// all the history we need is retained by an existing retention lease, so we do not need a separate retention lock // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock
retentionLock.close(); retentionLock.close();
logger.trace("history is retained by {}", retentionLeaseRef.get()); logger.trace("history is retained by {}", retentionLeaseRef.get());
@ -227,7 +225,7 @@ public class RecoverySourceHandler {
// advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can // advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can
// always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled // always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled
// down. // down.
startingSeqNo = useRetentionLeases startingSeqNo = softDeletesEnabled
? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L ? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L
: 0; : 0;
logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);
@ -245,7 +243,7 @@ public class RecoverySourceHandler {
}); });
final StepListener<ReplicationResponse> deleteRetentionLeaseStep = new StepListener<>(); final StepListener<ReplicationResponse> deleteRetentionLeaseStep = new StepListener<>();
if (useRetentionLeases) { if (softDeletesEnabled) {
runUnderPrimaryPermit(() -> { runUnderPrimaryPermit(() -> {
try { try {
// If the target previously had a copy of this shard then a file-based recovery might move its global // If the target previously had a copy of this shard then a file-based recovery might move its global
@ -268,7 +266,7 @@ public class RecoverySourceHandler {
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]"); assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");
final Consumer<ActionListener<RetentionLease>> createRetentionLeaseAsync; final Consumer<ActionListener<RetentionLease>> createRetentionLeaseAsync;
if (useRetentionLeases) { if (softDeletesEnabled) {
createRetentionLeaseAsync = l -> createRetentionLease(startingSeqNo, l); createRetentionLeaseAsync = l -> createRetentionLease(startingSeqNo, l);
} else { } else {
createRetentionLeaseAsync = l -> l.onResponse(null); createRetentionLeaseAsync = l -> l.onResponse(null);
@ -306,7 +304,7 @@ public class RecoverySourceHandler {
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo); final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
resources.add(phase2Snapshot); resources.add(phase2Snapshot);
if (useRetentionLeases == false || isSequenceNumberBasedRecovery == false) { if (softDeletesEnabled == false || isSequenceNumberBasedRecovery == false) {
// we can release the retention lock here because the snapshot itself will retain the required operations. // we can release the retention lock here because the snapshot itself will retain the required operations.
retentionLock.close(); retentionLock.close();
} }

View File

@ -323,6 +323,40 @@ public class ReplicaShardAllocatorIT extends ESIntegTestCase {
transportService.clearAllRules(); transportService.clearAllRules();
} }
public void testPeerRecoveryForClosedIndices() throws Exception {
String indexName = "peer_recovery_closed_indices";
internalCluster().ensureAtLeastNumDataNodes(1);
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
.build());
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(1, 100))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(Collectors.toList()));
ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName);
assertAcked(client().admin().indices().prepareClose(indexName));
int numberOfReplicas = randomIntBetween(1, 2);
internalCluster().ensureAtLeastNumDataNodes(2 + numberOfReplicas);
assertAcked(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)));
ensureGreen(indexName);
ensureActivePeerRecoveryRetentionLeasesAdvanced(indexName);
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("cluster.routing.allocation.enable", "primaries").build()));
internalCluster().fullRestart();
ensureYellow(indexName);
if (randomBoolean()) {
assertAcked(client().admin().indices().prepareOpen(indexName));
client().admin().indices().prepareForceMerge(indexName).get();
}
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setPersistentSettings(Settings.builder().putNull("cluster.routing.allocation.enable").build()));
ensureGreen(indexName);
assertNoOpRecoveries(indexName);
}
private void ensureActivePeerRecoveryRetentionLeasesAdvanced(String indexName) throws Exception { private void ensureActivePeerRecoveryRetentionLeasesAdvanced(String indexName) throws Exception {
assertBusy(() -> { assertBusy(() -> {
Index index = resolveIndex(indexName); Index index = resolveIndex(indexName);

View File

@ -17,17 +17,12 @@
package org.elasticsearch.index.seqno; package org.elasticsearch.index.seqno;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.core.internal.io.IOUtils;
@ -35,31 +30,21 @@ import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.mockito.ArgumentCaptor;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions;
import static org.elasticsearch.mock.orig.Mockito.when; import static org.elasticsearch.mock.orig.Mockito.when;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -173,73 +158,6 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
assertTrue(success.get()); assertTrue(success.get());
} }
public void testRetentionLeaseSyncExecution() {
final IndicesService indicesService = mock(IndicesService.class);
final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);
final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);
final ShardId shardId = new ShardId(index, id);
when(indexShard.shardId()).thenReturn(shardId);
final Logger retentionLeaseSyncActionLogger = mock(Logger.class);
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final AtomicBoolean invoked = new AtomicBoolean();
final RetentionLeaseBackgroundSyncAction action = new RetentionLeaseBackgroundSyncAction(
Settings.EMPTY,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet())) {
@Override
protected void doExecute(Task task, Request request, ActionListener<ReplicationResponse> listener) {
assertTrue(threadPool.getThreadContext().isSystemContext());
assertThat(request.shardId(), sameInstance(indexShard.shardId()));
assertThat(request.getRetentionLeases(), sameInstance(retentionLeases));
if (randomBoolean()) {
listener.onResponse(new ReplicationResponse());
} else {
final Exception e = randomFrom(
new AlreadyClosedException("closed"),
new IndexShardClosedException(indexShard.shardId()),
new TransportException("failed"),
new SendRequestTransportException(null, randomFrom(
"some-action",
"indices:admin/seq_no/retention_lease_background_sync[p]"
), new NodeClosedException((DiscoveryNode) null)),
new RuntimeException("failed"));
listener.onFailure(e);
if (e.getMessage().equals("failed")) {
final ArgumentCaptor<ParameterizedMessage> captor = ArgumentCaptor.forClass(ParameterizedMessage.class);
verify(retentionLeaseSyncActionLogger).warn(captor.capture(), same(e));
final ParameterizedMessage message = captor.getValue();
assertThat(message.getFormat(), equalTo("{} retention lease background sync failed"));
assertThat(message.getParameters(), arrayContaining(indexShard.shardId()));
}
verifyNoMoreInteractions(retentionLeaseSyncActionLogger);
}
invoked.set(true);
}
@Override
protected Logger getLogger() {
return retentionLeaseSyncActionLogger;
}
};
action.backgroundSync(indexShard.shardId(), retentionLeases);
assertTrue(invoked.get());
}
public void testBlocks() { public void testBlocks() {
final IndicesService indicesService = mock(IndicesService.class); final IndicesService indicesService = mock(IndicesService.class);

View File

@ -17,9 +17,6 @@
package org.elasticsearch.index.seqno; package org.elasticsearch.index.seqno;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ActionTestUtils;
@ -28,31 +25,23 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.mockito.ArgumentCaptor;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions;
import static org.elasticsearch.mock.orig.Mockito.when; import static org.elasticsearch.mock.orig.Mockito.when;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -126,7 +115,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
)); ));
} }
public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException { public void testRetentionLeaseSyncActionOnReplica() throws Exception {
final IndicesService indicesService = mock(IndicesService.class); final IndicesService indicesService = mock(IndicesService.class);
final Index index = new Index("index", "uuid"); final Index index = new Index("index", "uuid");
@ -163,68 +152,6 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
assertTrue(success.get()); assertTrue(success.get());
} }
public void testRetentionLeaseSyncExecution() {
final IndicesService indicesService = mock(IndicesService.class);
final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);
final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);
final ShardId shardId = new ShardId(index, id);
when(indexShard.shardId()).thenReturn(shardId);
final Logger retentionLeaseSyncActionLogger = mock(Logger.class);
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final AtomicBoolean invoked = new AtomicBoolean();
final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction(
Settings.EMPTY,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet())) {
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
assertTrue(threadPool.getThreadContext().isSystemContext());
assertThat(request.shardId(), sameInstance(indexShard.shardId()));
assertThat(request.getRetentionLeases(), sameInstance(retentionLeases));
if (randomBoolean()) {
listener.onResponse(new Response());
} else {
final Exception e = randomFrom(
new AlreadyClosedException("closed"),
new IndexShardClosedException(indexShard.shardId()),
new RuntimeException("failed"));
listener.onFailure(e);
if (e instanceof AlreadyClosedException == false && e instanceof IndexShardClosedException == false) {
final ArgumentCaptor<ParameterizedMessage> captor = ArgumentCaptor.forClass(ParameterizedMessage.class);
verify(retentionLeaseSyncActionLogger).warn(captor.capture(), same(e));
final ParameterizedMessage message = captor.getValue();
assertThat(message.getFormat(), equalTo("{} retention lease sync failed"));
assertThat(message.getParameters(), arrayContaining(indexShard.shardId()));
}
verifyNoMoreInteractions(retentionLeaseSyncActionLogger);
}
invoked.set(true);
}
@Override
protected Logger getLogger() {
return retentionLeaseSyncActionLogger;
}
};
// execution happens on the test thread, so no need to register an actual listener to callback
action.sync(indexShard.shardId(), retentionLeases, ActionListener.wrap(() -> {}));
assertTrue(invoked.get());
}
public void testBlocks() { public void testBlocks() {
final IndicesService indicesService = mock(IndicesService.class); final IndicesService indicesService = mock(IndicesService.class);

View File

@ -144,8 +144,9 @@ import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards; import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
@ -1137,22 +1138,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
threadPool, threadPool,
shardStateAction, shardStateAction,
actionFilters), actionFilters),
new RetentionLeaseSyncAction( RetentionLeaseSyncer.EMPTY);
settings,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters),
new RetentionLeaseBackgroundSyncAction(
settings,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters));
Map<ActionType, TransportAction> actions = new HashMap<>(); Map<ActionType, TransportAction> actions = new HashMap<>();
final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(settings, clusterService, final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(settings, clusterService,
indicesService, indicesService,

View File

@ -383,7 +383,7 @@ public class DeterministicTaskQueue {
@Override @Override
public Runnable preserveContext(Runnable command) { public Runnable preserveContext(Runnable command) {
throw new UnsupportedOperationException(); return command;
} }
@Override @Override

View File

@ -181,21 +181,15 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
} }
}); });
private final RetentionLeaseSyncer retentionLeaseSyncer = new RetentionLeaseSyncer() { private final RetentionLeaseSyncer retentionLeaseSyncer = new RetentionLeaseSyncer(
@Override (shardId, primaryAllocationId, primaryTerm, retentionLeases, listener) ->
public void sync(ShardId shardId, RetentionLeases retentionLeases, ActionListener<ReplicationResponse> listener) { syncRetentionLeases(shardId, retentionLeases, listener),
syncRetentionLeases(shardId, retentionLeases, listener); (shardId, primaryAllocationId, primaryTerm, retentionLeases) -> syncRetentionLeases(shardId, retentionLeases,
} ActionListener.wrap(
@Override
public void backgroundSync(ShardId shardId, RetentionLeases retentionLeases) {
sync(shardId, retentionLeases, ActionListener.wrap(
r -> { }, r -> { },
e -> { e -> {
throw new AssertionError("failed to background sync retention lease", e); throw new AssertionError("failed to background sync retention lease", e);
})); })));
}
};
protected ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { protected ReplicationGroup(final IndexMetaData indexMetaData) throws IOException {
final ShardRouting primaryRouting = this.createShardRouting("s0", true); final ShardRouting primaryRouting = this.createShardRouting("s0", true);

View File

@ -202,7 +202,7 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase {
assertOK(client().performRequest(new Request("POST", "/" + forgetFollower + "/_ccr/pause_follow"))); assertOK(client().performRequest(new Request("POST", "/" + forgetFollower + "/_ccr/pause_follow")));
try (RestClient leaderClient = buildLeaderClient(restClientSettings())) { try (RestClient leaderClient = buildLeaderClient(restAdminSettings())) {
final Request request = new Request("POST", "/" + forgetLeader + "/_ccr/forget_follower"); final Request request = new Request("POST", "/" + forgetLeader + "/_ccr/forget_follower");
final String requestBody = "{" + final String requestBody = "{" +
"\"follower_cluster\":\"follow-cluster\"," + "\"follower_cluster\":\"follow-cluster\"," +