Renew retention leases while following (#39335)

This commit is the final piece of the integration of CCR with retention
leases. Namely, we periodically renew retention leases and advance the
retaining sequence number while following.
This commit is contained in:
Jason Tedor 2019-02-25 17:07:55 -05:00
parent 7b8178c839
commit a6c0166d68
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
10 changed files with 788 additions and 99 deletions

View File

@ -540,12 +540,16 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
new SyncRetentionLeases(request, ReplicationGroup.this, wrappedListener).execute();
}
public RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source,
public synchronized RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source,
ActionListener<ReplicationResponse> listener) {
return getPrimary().addRetentionLease(id, retainingSequenceNumber, source, listener);
}
public void removeRetentionLease(String id, ActionListener<ReplicationResponse> listener) {
public synchronized RetentionLease renewRetentionLease(String id, long retainingSequenceNumber, String source) {
return getPrimary().renewRetentionLease(id, retainingSequenceNumber, source);
}
public synchronized void removeRetentionLease(String id, ActionListener<ReplicationResponse> listener) {
getPrimary().removeRetentionLease(id, listener);
}

View File

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.ccr;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
@ -18,11 +19,18 @@ import org.elasticsearch.index.shard.ShardId;
import java.util.Locale;
import java.util.Optional;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import java.util.concurrent.TimeUnit;
public class CcrRetentionLeases {
// this setting is intentionally not registered, it is only used in tests
public static final Setting<TimeValue> RETENTION_LEASE_RENEW_INTERVAL_SETTING =
Setting.timeSetting(
"index.ccr.retention_lease.renew_interval",
new TimeValue(5, TimeUnit.MINUTES),
new TimeValue(0, TimeUnit.MILLISECONDS),
Setting.Property.NodeScope);
/**
* The retention lease ID used by followers.
*
@ -52,20 +60,22 @@ public class CcrRetentionLeases {
* Synchronously requests to add a retention lease with the specified retention lease ID on the specified leader shard using the given
* remote client. Note that this method will block up to the specified timeout.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param timeout the timeout
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param retainingSequenceNumber the retaining sequence number
* @param remoteClient the remote client on which to execute this request
* @param timeout the timeout
* @return an optional exception indicating whether or not the retention lease already exists
*/
public static Optional<RetentionLeaseAlreadyExistsException> syncAddRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final long retainingSequenceNumber,
final Client remoteClient,
final TimeValue timeout) {
try {
final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
asyncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
asyncAddRetentionLease(leaderShardId, retentionLeaseId, retainingSequenceNumber, remoteClient, response);
response.actionGet(timeout);
return Optional.empty();
} catch (final RetentionLeaseAlreadyExistsException e) {
@ -78,18 +88,20 @@ public class CcrRetentionLeases {
* remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a response
* or failure.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param listener the listener
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param retainingSequenceNumber the retaining sequence number
* @param remoteClient the remote client on which to execute this request
* @param listener the listener
*/
public static void asyncAddRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final long retainingSequenceNumber,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
final RetentionLeaseActions.AddRequest request =
new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, retainingSequenceNumber, "ccr");
remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener);
}
@ -97,20 +109,22 @@ public class CcrRetentionLeases {
* Synchronously requests to renew a retention lease with the specified retention lease ID on the specified leader shard using the given
* remote client. Note that this method will block up to the specified timeout.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param timeout the timeout
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param retainingSequenceNumber the retaining sequence number
* @param remoteClient the remote client on which to execute this request
* @param timeout the timeout
* @return an optional exception indicating whether or not the retention lease already exists
*/
public static Optional<RetentionLeaseNotFoundException> syncRenewRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final long retainingSequenceNumber,
final Client remoteClient,
final TimeValue timeout) {
try {
final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
asyncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
asyncRenewRetentionLease(leaderShardId, retentionLeaseId, retainingSequenceNumber, remoteClient, response);
response.actionGet(timeout);
return Optional.empty();
} catch (final RetentionLeaseNotFoundException e) {
@ -123,18 +137,20 @@ public class CcrRetentionLeases {
* given remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a
* response or failure.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param listener the listener
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param retainingSequenceNumber the retaining sequence number
* @param remoteClient the remote client on which to execute this request
* @param listener the listener
*/
public static void asyncRenewRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final long retainingSequenceNumber,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
final RetentionLeaseActions.RenewRequest request =
new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, retainingSequenceNumber, "ccr");
remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener);
}

View File

@ -30,9 +30,10 @@ import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
@ -94,6 +95,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private volatile ElasticsearchException fatalException;
private Scheduler.Cancellable renewable;
synchronized Scheduler.Cancellable getRenewable() {
return renewable;
}
ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
super(id, type, action, description, parentTask, headers);
@ -121,7 +128,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
final long followerMaxSeqNo) {
/*
* While this should only ever be called once and before any other threads can touch these fields, we use synchronization here to
* avoid the need to declare these fields as volatile. That is, we are ensuring thesefields are always accessed under the same lock.
* avoid the need to declare these fields as volatile. That is, we are ensuring these fields are always accessed under the same
* lock.
*/
synchronized (this) {
this.followerHistoryUUID = followerHistoryUUID;
@ -130,6 +138,11 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
this.followerGlobalCheckpoint = followerGlobalCheckpoint;
this.followerMaxSeqNo = followerMaxSeqNo;
this.lastRequestedSeqNo = followerGlobalCheckpoint;
renewable = scheduleBackgroundRetentionLeaseRenewal(() -> {
synchronized (ShardFollowNodeTask.this) {
return this.followerGlobalCheckpoint;
}
});
}
// updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical
@ -507,8 +520,16 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
protected abstract void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
Consumer<Exception> errorHandler);
protected abstract Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(LongSupplier followerGlobalCheckpoint);
@Override
protected void onCancelled() {
synchronized (this) {
if (renewable != null) {
renewable.cancel();
renewable = null;
}
}
markAsCompleted();
}

View File

@ -9,6 +9,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@ -32,10 +34,14 @@ import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
@ -45,9 +51,11 @@ import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrRetentionLeases;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
@ -60,6 +68,7 @@ import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import static org.elasticsearch.xpack.ccr.CcrLicenseChecker.wrapClient;
@ -73,6 +82,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final IndexScopedSettings indexScopedSettings;
private final TimeValue retentionLeaseRenewInterval;
private volatile TimeValue waitForMetadataTimeOut;
public ShardFollowTasksExecutor(Client client,
@ -84,6 +94,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indexScopedSettings = settingsModule.getIndexScopedSettings();
this.retentionLeaseRenewInterval = CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(settingsModule.getSettings());
this.waitForMetadataTimeOut = CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.get(settingsModule.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT,
newVal -> this.waitForMetadataTimeOut = newVal);
@ -245,6 +256,96 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
errorHandler.accept(e);
}
}
@Override
protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) {
final String retentionLeaseId = CcrRetentionLeases.retentionLeaseId(
clusterService.getClusterName().value(),
params.getFollowShardId().getIndex(),
params.getRemoteCluster(),
params.getLeaderShardId().getIndex());
/*
* We are going to attempt to renew the retention lease. If this fails it is either because the retention lease does not
* exist, or something else happened. If the retention lease does not exist, we will attempt to add the retention lease
* again. If that fails, it had better not be because the retention lease already exists. Either way, we will attempt to
* renew again on the next scheduled execution.
*/
final ActionListener<RetentionLeaseActions.Response> listener = ActionListener.wrap(
r -> {},
e -> {
/*
* We have to guard against the possibility that the shard follow node task has been stopped and the retention
* lease deliberately removed via the act of unfollowing. Note that the order of operations is important in
* TransportUnfollowAction. There, we first stop the shard follow node task, and then remove the retention
* leases on the leader. This means that if we end up here with the retention lease not existing because of an
* unfollow action, then we know that the unfollow action has already stopped the shard follow node task and
* there is no race condition with the unfollow action.
*/
if (isCancelled() || isCompleted()) {
return;
}
final Throwable cause = ExceptionsHelper.unwrapCause(e);
logRetentionLeaseFailure(retentionLeaseId, cause);
// noinspection StatementWithEmptyBody
if (cause instanceof RetentionLeaseNotFoundException) {
// note that we do not need to mark as system context here as that is restored from the original renew
logger.trace(
"{} background adding retention lease [{}] while following",
params.getFollowShardId(),
retentionLeaseId);
CcrRetentionLeases.asyncAddRetentionLease(
params.getLeaderShardId(),
retentionLeaseId,
followerGlobalCheckpoint.getAsLong(),
remoteClient(params),
ActionListener.wrap(
r -> {},
inner -> {
/*
* If this fails that the retention lease already exists, something highly unusual is
* going on. Log it, and renew again after another renew interval has passed.
*/
final Throwable innerCause = ExceptionsHelper.unwrapCause(inner);
assert innerCause instanceof RetentionLeaseAlreadyExistsException == false;
logRetentionLeaseFailure(retentionLeaseId, innerCause);
}));
} else {
// if something else happened, we will attempt to renew again after another renew interval has passed
}
});
return threadPool.scheduleWithFixedDelay(
() -> {
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the management is authorized
threadContext.markAsSystemContext();
logger.trace(
"{} background renewing retention lease [{}] while following",
params.getFollowShardId(),
retentionLeaseId);
CcrRetentionLeases.asyncRenewRetentionLease(
params.getLeaderShardId(),
retentionLeaseId,
followerGlobalCheckpoint.getAsLong(),
remoteClient(params),
listener);
}
},
retentionLeaseRenewInterval,
Ccr.CCR_THREAD_POOL_NAME);
}
private void logRetentionLeaseFailure(final String retentionLeaseId, final Throwable cause) {
assert cause instanceof ElasticsearchSecurityException == false : cause;
logger.warn(new ParameterizedMessage(
"{} background management of retention lease [{}] failed while following",
params.getFollowShardId(),
retentionLeaseId),
cause);
}
};
}

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -89,11 +88,11 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncAddRetentionLease;
@ -330,6 +329,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
CcrRetentionLeases.asyncRenewRetentionLease(
leaderShardId,
retentionLeaseId,
RETAIN_ALL,
remoteClient,
ActionListener.wrap(
r -> {},
@ -343,7 +343,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
}));
}
},
RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(indexShard.indexSettings().getSettings()),
CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(indexShard.indexSettings().getNodeSettings()),
Ccr.CCR_THREAD_POOL_NAME);
// TODO: There should be some local timeout. And if the remote cluster returns an unknown session
@ -380,7 +380,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
() -> new ParameterizedMessage("{} requesting leader to add retention lease [{}]", shardId, retentionLeaseId));
final TimeValue timeout = ccrSettings.getRecoveryActionTimeout();
final Optional<RetentionLeaseAlreadyExistsException> maybeAddAlready =
syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout);
syncAddRetentionLease(leaderShardId, retentionLeaseId, RETAIN_ALL, remoteClient, timeout);
maybeAddAlready.ifPresent(addAlready -> {
logger.trace(() -> new ParameterizedMessage(
"{} retention lease [{}] already exists, requesting a renewal",
@ -388,7 +388,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
retentionLeaseId),
addAlready);
final Optional<RetentionLeaseNotFoundException> maybeRenewNotFound =
syncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout);
syncRenewRetentionLease(leaderShardId, retentionLeaseId, RETAIN_ALL, remoteClient, timeout);
maybeRenewNotFound.ifPresent(renewNotFound -> {
logger.trace(() -> new ParameterizedMessage(
"{} retention lease [{}] not found while attempting to renew, requesting a final add",
@ -396,7 +396,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
retentionLeaseId),
renewNotFound);
final Optional<RetentionLeaseAlreadyExistsException> maybeFallbackAddAlready =
syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout);
syncAddRetentionLease(leaderShardId, retentionLeaseId, RETAIN_ALL, remoteClient, timeout);
maybeFallbackAddAlready.ifPresent(fallbackAddAlready -> {
/*
* At this point we tried to add the lease and the retention lease already existed. By the time we tried to renew the
@ -409,15 +409,6 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
});
}
// this setting is intentionally not registered, it is only used in tests
public static final Setting<TimeValue> RETENTION_LEASE_RENEW_INTERVAL_SETTING =
Setting.timeSetting(
"index.ccr.retention_lease.renew_interval",
new TimeValue(5, TimeUnit.MINUTES),
new TimeValue(0, TimeUnit.MILLISECONDS),
Setting.Property.Dynamic,
Setting.Property.IndexScope);
@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);

View File

@ -131,6 +131,14 @@ public abstract class CcrIntegTestCase extends ESTestCase {
return Collections.emptyList();
}
protected Settings leaderClusterSettings() {
return Settings.EMPTY;
}
protected Settings followerClusterSettings() {
return Settings.EMPTY;
}
@Before
public final void startClusters() throws Exception {
if (clusterGroup != null && reuseClusters()) {
@ -145,7 +153,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
MockNioTransportPlugin.class, InternalSettingsPlugin.class);
InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
numberOfNodesPerCluster(), "leader_cluster", createNodeConfigurationSource(null), 0, "leader", mockPlugins,
numberOfNodesPerCluster(), "leader_cluster", createNodeConfigurationSource(null, true), 0, "leader", mockPlugins,
Function.identity());
leaderCluster.beforeTest(random(), 0.0D);
leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster());
@ -156,7 +164,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
InternalTestCluster followerCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
numberOfNodesPerCluster(), "follower_cluster", createNodeConfigurationSource(address), 0, "follower",
numberOfNodesPerCluster(), "follower_cluster", createNodeConfigurationSource(address, false), 0, "follower",
mockPlugins, Function.identity());
clusterGroup = new ClusterGroup(leaderCluster, followerCluster);
@ -203,7 +211,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
}
}
private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedAddress) {
private NodeConfigurationSource createNodeConfigurationSource(final String leaderSeedAddress, final boolean leaderCluster) {
Settings.Builder builder = Settings.builder();
builder.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE);
// Default the watermarks to absurdly low to prevent the tests
@ -225,6 +233,11 @@ public abstract class CcrIntegTestCase extends ESTestCase {
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
// Let cluster state api return quickly in order to speed up auto follow tests:
builder.put(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
if (leaderCluster) {
builder.put(leaderClusterSettings());
} else {
builder.put(followerClusterSettings());
}
if (configureRemoteClusterViaNodeSettings() && leaderSeedAddress != null) {
builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress);
}

View File

@ -21,6 +21,7 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -33,6 +34,7 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
@ -40,14 +42,19 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
import java.io.IOException;
@ -83,7 +90,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING);
return Collections.singletonList(CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING);
}
}
@ -105,6 +112,13 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
.collect(Collectors.toList());
}
@Override
protected Settings followerClusterSettings() {
return Settings.builder()
.put(CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200))
.build();
}
private final IndicesOptions indicesOptions = IndicesOptions.strictSingleIndexNoExpandForbidClosed();
private RestoreSnapshotRequest setUpRestoreSnapshotRequest(
@ -140,7 +154,6 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
final Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
.put(CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200))
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
return new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST)
.indexSettings(settingsBuilder)
@ -227,42 +240,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
try {
// ensure that a retention lease has been put in place on each shard, and grab a copy of them
final List<RetentionLeases> retentionLeases = new ArrayList<>();
assertBusy(() -> {
retentionLeases.clear();
final IndicesStatsResponse stats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardsStats = getShardsStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final RetentionLease retentionLease =
currentRetentionLeases.leases().iterator().next();
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex)));
retentionLeases.add(currentRetentionLeases);
}
});
// now ensure that the retention leases are being renewed
assertBusy(() -> {
final IndicesStatsResponse stats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardsStats = getShardsStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final RetentionLease retentionLease =
currentRetentionLeases.leases().iterator().next();
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex)));
// we assert that retention leases are being renewed by an increase in the timestamp
assertThat(retentionLease.timestamp(), greaterThan(retentionLeases.get(i).leases().iterator().next().timestamp()));
}
});
assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex);
latch.countDown();
} finally {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) {
@ -354,15 +332,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
* After we wake up, it should be the case that the retention leases are the same (same timestamp) as that indicates that they were
* not renewed while we were sleeping.
*/
final TimeValue renewIntervalSetting = CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(
followerClient()
.admin()
.indices()
.prepareGetSettings(followerIndex)
.get()
.getIndexToSettings()
.get(followerIndex));
final TimeValue renewIntervalSetting = CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(followerClusterSettings());
final long renewEnd = System.nanoTime();
Thread.sleep(Math.max(0, randomIntBetween(2, 4) * renewIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(renewEnd - start)));
@ -404,6 +374,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
final String leaderIndexSettings =
getIndexSettings(numberOfShards, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
ensureLeaderYellow(leaderIndex);
final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
@ -469,11 +440,8 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
});
}
pauseFollow(followerIndex);
followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet();
assertAcked(followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet());
assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet());
final IndicesStatsResponse afterUnfollowStats =
@ -498,6 +466,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
final String leaderIndexSettings =
getIndexSettings(numberOfShards, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
ensureLeaderYellow(leaderIndex);
final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
@ -560,6 +529,438 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
}
}
public void testRetentionLeaseRenewedWhileFollowing() throws Exception {
final String leaderIndex = "leader";
final String followerIndex = "follower";
final int numberOfShards = randomIntBetween(1, 4);
final int numberOfReplicas = randomIntBetween(0, 1);
final Map<String, String> additionalIndexSettings = new HashMap<>();
additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true));
additionalIndexSettings.put(
IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(),
TimeValue.timeValueMillis(200).getStringRep());
final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings);
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
ensureLeaderYellow(leaderIndex);
final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
ensureFollowerGreen(true, followerIndex);
assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex);
}
public void testRetentionLeaseAdvancesWhileFollowing() throws Exception {
final String leaderIndex = "leader";
final String followerIndex = "follower";
final int numberOfShards = randomIntBetween(1, 4);
final int numberOfReplicas = randomIntBetween(0, 1);
final Map<String, String> additionalIndexSettings = new HashMap<>();
additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true));
additionalIndexSettings.put(
IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(),
TimeValue.timeValueMillis(200).getStringRep());
final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings);
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
ensureLeaderYellow(leaderIndex);
final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
ensureFollowerGreen(true, followerIndex);
final int numberOfDocuments = randomIntBetween(128, 2048);
logger.debug("indexing [{}] docs", numberOfDocuments);
for (int i = 0; i < numberOfDocuments; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex(leaderIndex, "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
if (rarely()) {
leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get();
}
}
// wait until the follower global checkpoints have caught up to the leader
assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex);
final List<ShardStats> leaderShardsStats = getShardsStats(leaderClient().admin().indices().prepareStats(leaderIndex).get());
final Map<Integer, Long> leaderGlobalCheckpoints = new HashMap<>();
for (final ShardStats leaderShardStats : leaderShardsStats) {
final ShardRouting routing = leaderShardStats.getShardRouting();
if (routing.primary() == false) {
continue;
}
leaderGlobalCheckpoints.put(routing.id(), leaderShardStats.getSeqNoStats().getGlobalCheckpoint());
}
// now assert that the retention leases have advanced to the global checkpoints
assertBusy(() -> {
final IndicesStatsResponse stats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardsStats = getShardsStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final RetentionLease retentionLease =
currentRetentionLeases.leases().iterator().next();
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex)));
// we assert that retention leases are being advanced
assertThat(
retentionLease.retainingSequenceNumber(),
equalTo(leaderGlobalCheckpoints.get(shardsStats.get(i).getShardRouting().id())));
}
});
}
@TestLogging(value = "org.elasticsearch.xpack.ccr:trace")
public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws Exception {
final String leaderIndex = "leader";
final String followerIndex = "follower";
final int numberOfShards = randomIntBetween(1, 4);
final int numberOfReplicas = randomIntBetween(0, 1);
final Map<String, String> additionalIndexSettings = new HashMap<>();
additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true));
additionalIndexSettings.put(
IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(),
TimeValue.timeValueMillis(200).getStringRep());
final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings);
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
ensureLeaderYellow(leaderIndex);
final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
ensureFollowerGreen(true, followerIndex);
final long start = System.nanoTime();
pauseFollow(followerIndex);
/*
* We want to ensure that the retention leases have been synced to all shard copies, as otherwise they might sync between the two
* times that we sample the retention leases, which would cause our check to fail.
*/
final TimeValue syncIntervalSetting = IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.get(
leaderClient()
.admin()
.indices()
.prepareGetSettings(leaderIndex)
.get()
.getIndexToSettings()
.get(leaderIndex));
final long syncEnd = System.nanoTime();
Thread.sleep(Math.max(0, randomIntBetween(2, 4) * syncIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(syncEnd - start)));
final ClusterStateResponse leaderIndexClusterState =
leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get();
final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID();
// sample the leases after pausing
final List<RetentionLeases> retentionLeases = new ArrayList<>();
assertBusy(() -> {
retentionLeases.clear();
final IndicesStatsResponse stats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardsStats = getShardsStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID();
final RetentionLease retentionLease =
currentRetentionLeases.leases().iterator().next();
final String expectedRetentionLeaseId = retentionLeaseId(
getFollowerCluster().getClusterName(),
new Index(followerIndex, followerUUID),
getLeaderCluster().getClusterName(),
new Index(leaderIndex, leaderUUID));
assertThat(retentionLease.id(), equalTo(expectedRetentionLeaseId));
retentionLeases.add(currentRetentionLeases);
}
});
/*
* We want to ensure that the background renewal is cancelled after pausing. To do this, we will sleep a small multiple of the renew
* interval. If the renews are not cancelled, we expect that a renewal would have been sent while we were sleeping. After we wake
* up, it should be the case that the retention leases are the same (same timestamp) as that indicates that they were not renewed
* while we were sleeping.
*/
final TimeValue renewIntervalSetting = CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(followerClusterSettings());
final long renewEnd = System.nanoTime();
Thread.sleep(Math.max(0, randomIntBetween(2, 4) * renewIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(renewEnd - start)));
// now ensure that the retention leases are the same
assertBusy(() -> {
final IndicesStatsResponse stats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardsStats = getShardsStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
if (shardsStats.get(i).getShardRouting().primary() == false) {
continue;
}
final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID();
final RetentionLease retentionLease =
currentRetentionLeases.leases().iterator().next();
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID)));
// we assert that retention leases are not being renewed by an unchanged timestamp
assertThat(retentionLease.timestamp(), equalTo(retentionLeases.get(i).leases().iterator().next().timestamp()));
}
});
}
public void testRetentionLeaseRenewalIsResumedWhenFollowingIsResumed() throws Exception {
final String leaderIndex = "leader";
final String followerIndex = "follower";
final int numberOfShards = randomIntBetween(1, 4);
final int numberOfReplicas = randomIntBetween(0, 1);
final Map<String, String> additionalIndexSettings = new HashMap<>();
additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true));
additionalIndexSettings.put(
IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(),
TimeValue.timeValueMillis(200).getStringRep());
final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings);
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
ensureLeaderYellow(leaderIndex);
final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
ensureFollowerGreen(true, followerIndex);
pauseFollow(followerIndex);
followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow(followerIndex)).actionGet();
ensureFollowerGreen(true, followerIndex);
assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex);
}
public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Exception {
final String leaderIndex = "leader";
final String followerIndex = "follower";
final int numberOfShards = 1;
final int numberOfReplicas = 1;
final Map<String, String> additionalIndexSettings = new HashMap<>();
additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true));
additionalIndexSettings.put(
IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(),
TimeValue.timeValueMillis(200).getStringRep());
final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings);
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
ensureLeaderYellow(leaderIndex);
final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
ensureFollowerGreen(true, followerIndex);
final CountDownLatch latch = new CountDownLatch(1);
final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
senderTransportService.addSendBehavior(
(connection, requestId, action, request, options) -> {
if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action)
|| TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) {
senderTransportService.clearAllRules();
final RetentionLeaseActions.RenewRequest renewRequest = (RetentionLeaseActions.RenewRequest) request;
final String primaryShardNodeId =
getLeaderCluster()
.clusterService()
.state()
.routingTable()
.index(leaderIndex)
.shard(renewRequest.getShardId().id())
.primaryShard()
.currentNodeId();
final String primaryShardNodeName =
getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName();
final IndexShard primary =
getLeaderCluster()
.getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(renewRequest.getShardId());
final CountDownLatch innerLatch = new CountDownLatch(1);
// this forces the background renewal from following to face a retention lease not found exception
primary.removeRetentionLease(
getRetentionLeaseId(followerIndex, leaderIndex),
ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString())));
try {
innerLatch.await();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
fail(e.toString());
}
latch.countDown();
}
connection.sendRequest(requestId, action, request, options);
});
}
latch.await();
assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex);
}
/**
* This test is fairly evil. This test is to ensure that we are protected against a race condition when unfollowing and a background
* renewal fires. The action of unfollowing will remove retention leases from the leader. If a background renewal is firing at that
* time, it means that we will be met with a retention lease not found exception. That will in turn trigger behavior to attempt to
* re-add the retention lease, which means we are left in a situation where we have unfollowed, but the retention lease still remains
* on the leader. However, we have a guard against this in the callback after the retention lease not found exception is thrown, which
* checks if the shard follow node task is cancelled or completed.
*
* To test this this behavior is correct, we capture the call to renew the retention lease. Then, we will step in between and execute
* an unfollow request. This will remove the retention lease on the leader. At this point, we can unlatch the renew call, which will
* now be met with a retention lease not found exception. We will cheat and wait for that response to come back, and then synchronously
* trigger the listener which will check to see if the shard follow node task is cancelled or completed, and if not, add the retention
* lease back. After that listener returns, we can check to see if a retention lease exists on the leader.
*
* Note, this done mean that listener will fire twice, once in our onResponseReceived hook, and once after our onResponseReceived
* callback returns. 🤷
*
* @throws Exception if an exception occurs in the main test thread
*/
public void testPeriodicRenewalDoesNotAddRetentionLeaseAfterUnfollow() throws Exception {
final String leaderIndex = "leader";
final String followerIndex = "follower";
final int numberOfShards = 1;
final int numberOfReplicas = 1;
final Map<String, String> additionalIndexSettings = new HashMap<>();
additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true));
additionalIndexSettings.put(
IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(),
TimeValue.timeValueMillis(200).getStringRep());
final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings);
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
ensureLeaderYellow(leaderIndex);
final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
ensureFollowerGreen(true, followerIndex);
final CountDownLatch removeLeaseLatch = new CountDownLatch(1);
final CountDownLatch unfollowLatch = new CountDownLatch(1);
final CountDownLatch responseLatch = new CountDownLatch(1);
final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
try {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
senderTransportService.addSendBehavior(
(connection, requestId, action, request, options) -> {
if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action)
|| TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) {
final String retentionLeaseId = getRetentionLeaseId(followerIndex, leaderIndex);
try {
removeLeaseLatch.countDown();
unfollowLatch.await();
senderTransportService.transport().addMessageListener(new TransportMessageListener() {
@SuppressWarnings("rawtypes")
@Override
public void onResponseReceived(
final long responseRequestId,
final Transport.ResponseContext context) {
if (requestId == responseRequestId) {
final RetentionLeaseNotFoundException e =
new RetentionLeaseNotFoundException(retentionLeaseId);
context.handler().handleException(new RemoteTransportException(e.getMessage(), e));
responseLatch.countDown();
senderTransportService.transport().removeMessageListener(this);
}
}
});
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
fail(e.toString());
}
}
connection.sendRequest(requestId, action, request, options);
});
}
removeLeaseLatch.await();
pauseFollow(followerIndex);
assertAcked(followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet());
assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet());
unfollowLatch.countDown();
responseLatch.await();
final IndicesStatsResponse afterUnfollowStats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
final List<ShardStats> afterUnfollowShardsStats = getShardsStats(afterUnfollowStats);
for (final ShardStats shardStats : afterUnfollowShardsStats) {
assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty());
}
} finally {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
senderTransportService.clearAllRules();
}
}
}
private void assertRetentionLeaseRenewal(
final int numberOfShards,
final int numberOfReplicas,
final String followerIndex,
final String leaderIndex) throws Exception {
// ensure that a retention lease has been put in place on each shard, and grab a copy of them
final List<RetentionLeases> retentionLeases = new ArrayList<>();
assertBusy(() -> {
retentionLeases.clear();
final IndicesStatsResponse stats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardsStats = getShardsStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final RetentionLease retentionLease =
currentRetentionLeases.leases().iterator().next();
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex)));
retentionLeases.add(currentRetentionLeases);
}
});
// now ensure that the retention leases are being renewed
assertBusy(() -> {
final IndicesStatsResponse stats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardsStats = getShardsStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final RetentionLease retentionLease =
currentRetentionLeases.leases().iterator().next();
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex)));
// we assert that retention leases are being renewed by an increase in the timestamp
assertThat(retentionLease.timestamp(), greaterThan(retentionLeases.get(i).leases().iterator().next().timestamp()));
}
});
}
/**
* Extract the shard stats from an indices stats response, with the stats ordered by shard ID with primaries first. This is to have a
* consistent ordering when comparing two responses.

View File

@ -14,6 +14,7 @@ import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo;
@ -177,6 +179,23 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
threadPool.generic().execute(task);
}
@Override
protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) {
return new Scheduler.Cancellable() {
@Override
public boolean cancel() {
return true;
}
@Override
public boolean isCancelled() {
return true;
}
};
}
@Override
protected boolean isStopped() {
return stopped.get();

View File

@ -8,13 +8,16 @@ package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
@ -27,12 +30,17 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
@ -53,6 +61,9 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
private Consumer<ShardFollowNodeTaskStatus> beforeSendShardChangesRequest = status -> {};
private AtomicBoolean scheduleRetentionLeaseRenewal = new AtomicBoolean();
private LongConsumer retentionLeaseRenewal = followerGlobalCheckpoint -> {};
private AtomicBoolean simulateResponse = new AtomicBoolean();
private Queue<Exception> readFailures;
@ -936,6 +947,28 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
assertThat(ShardFollowNodeTask.computeDelay(1024, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(1000L)));
}
public void testRetentionLeaseRenewal() throws InterruptedException {
scheduleRetentionLeaseRenewal.set(true);
final CountDownLatch latch = new CountDownLatch(1);
final long expectedFollowerGlobalChekcpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
retentionLeaseRenewal = followerGlobalCheckpoint -> {
assertThat(followerGlobalCheckpoint, equalTo(expectedFollowerGlobalChekcpoint));
latch.countDown();
};
final ShardFollowTaskParams params = new ShardFollowTaskParams();
final ShardFollowNodeTask task = createShardFollowTask(params);
try {
startTask(task, randomLongBetween(expectedFollowerGlobalChekcpoint, Long.MAX_VALUE), expectedFollowerGlobalChekcpoint);
latch.await();
} finally {
task.onCancelled();
scheduleRetentionLeaseRenewal.set(false);
}
}
static final class ShardFollowTaskParams {
private String remoteCluster = null;
private ShardId followShardId = new ShardId("follow_index", "", 0);
@ -1063,6 +1096,47 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
}
}
@Override
protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) {
if (scheduleRetentionLeaseRenewal.get()) {
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
final ScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(
() -> retentionLeaseRenewal.accept(followerGlobalCheckpoint.getAsLong()),
0,
TimeValue.timeValueMillis(200).millis(),
TimeUnit.MILLISECONDS);
return new Scheduler.Cancellable() {
@Override
public boolean cancel() {
final boolean cancel = future.cancel(true);
scheduler.shutdown();
return cancel;
}
@Override
public boolean isCancelled() {
return future.isCancelled();
}
};
} else {
return new Scheduler.Cancellable() {
@Override
public boolean cancel() {
return true;
}
@Override
public boolean isCancelled() {
return true;
}
};
}
}
@Override
protected boolean isStopped() {
return super.isStopped() || stopped.get();

View File

@ -16,6 +16,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -47,7 +48,9 @@ import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrRetentionLeases;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
@ -69,6 +72,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
@ -390,6 +394,28 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
}
}
public void testRetentionLeaseManagement() throws Exception {
try (ReplicationGroup leader = createLeaderGroup(0)) {
leader.startAll();
try (ReplicationGroup follower = createFollowGroup(leader, 0)) {
follower.startAll();
final ShardFollowNodeTask task = createShardFollowTask(leader, follower);
task.start(
follower.getPrimary().getHistoryUUID(),
leader.getPrimary().getGlobalCheckpoint(),
leader.getPrimary().seqNoStats().getMaxSeqNo(),
follower.getPrimary().getGlobalCheckpoint(),
follower.getPrimary().seqNoStats().getMaxSeqNo());
final Scheduler.Cancellable renewable = task.getRenewable();
assertNotNull(renewable);
assertFalse(renewable.isCancelled());
task.onCancelled();
assertTrue(renewable.isCancelled());
assertNull(task.getRenewable());
}
}
}
private ReplicationGroup createLeaderGroup(int replicas) throws IOException {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
@ -399,10 +425,12 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
}
private ReplicationGroup createFollowGroup(ReplicationGroup leaderGroup, int replicas) throws IOException {
Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB))
.build();
final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB))
.build();
IndexMetaData indexMetaData = buildIndexMetaData(replicas, settings, indexMapping);
return new ReplicationGroup(indexMetaData) {
@Override
@ -543,6 +571,27 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
threadPool.executor(ThreadPool.Names.GENERIC).execute(task);
}
@Override
protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) {
final String retentionLeaseId = CcrRetentionLeases.retentionLeaseId(
"follower",
followerGroup.getPrimary().routingEntry().index(),
"remote",
leaderGroup.getPrimary().routingEntry().index());
final PlainActionFuture<ReplicationResponse> response = new PlainActionFuture<>();
leaderGroup.addRetentionLease(
retentionLeaseId,
followerGlobalCheckpoint.getAsLong(),
"ccr",
ActionListener.wrap(response::onResponse, e -> fail(e.toString())));
response.actionGet();
return threadPool.scheduleWithFixedDelay(
() -> leaderGroup.renewRetentionLease(retentionLeaseId, followerGlobalCheckpoint.getAsLong(), "ccr"),
CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(
followerGroup.getPrimary().indexSettings().getSettings()),
ThreadPool.Names.GENERIC);
}
@Override
protected boolean isStopped() {
return super.isStopped() || stopped.get();