Remove retention leases when unfollowing (#39088)

This commit attempts to remove the retention leases on the leader shards
when unfollowing an index. This is best effort, since the leader might
not be available.
This commit is contained in:
Jason Tedor 2019-02-20 07:05:47 -05:00
parent eae2c9dd5c
commit 09ea3ccd16
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
9 changed files with 487 additions and 106 deletions

View File

@ -6,9 +6,20 @@
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import java.util.Locale;
import java.util.Optional;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
public class CcrRetentionLeases {
@ -37,4 +48,113 @@ public class CcrRetentionLeases {
leaderIndex.getUUID());
}
/**
* Synchronously requests to add a retention lease with the specified retention lease ID on the specified leader shard using the given
* remote client. Note that this method will block up to the specified timeout.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param timeout the timeout
* @return an optional exception indicating whether or not the retention lease already exists
*/
public static Optional<RetentionLeaseAlreadyExistsException> syncAddRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final TimeValue timeout) {
try {
final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
asyncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
response.actionGet(timeout);
return Optional.empty();
} catch (final RetentionLeaseAlreadyExistsException e) {
return Optional.of(e);
}
}
/**
* Asynchronously requests to add a retention lease with the specified retention lease ID on the specified leader shard using the given
* remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a response
* or failure.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param listener the listener
*/
public static void asyncAddRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
final RetentionLeaseActions.AddRequest request =
new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener);
}
/**
* Synchronously requests to renew a retention lease with the specified retention lease ID on the specified leader shard using the given
* remote client. Note that this method will block up to the specified timeout.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param timeout the timeout
* @return an optional exception indicating whether or not the retention lease already exists
*/
public static Optional<RetentionLeaseNotFoundException> syncRenewRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final TimeValue timeout) {
try {
final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
asyncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
response.actionGet(timeout);
return Optional.empty();
} catch (final RetentionLeaseNotFoundException e) {
return Optional.of(e);
}
}
/**
* Asynchronously requests to renew a retention lease with the specified retention lease ID on the specified leader shard using the
* given remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a
* response or failure.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param listener the listener
*/
public static void asyncRenewRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
final RetentionLeaseActions.RenewRequest request =
new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener);
}
/**
* Asynchronously requests to remove a retention lease with the specified retention lease ID on the specified leader shard using the
* given remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a
* response or failure.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param listener the listener
*/
public static void asyncRemoveRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
final RetentionLeaseActions.RemoveRequest request = new RetentionLeaseActions.RemoveRequest(leaderShardId, retentionLeaseId);
remoteClient.execute(RetentionLeaseActions.Remove.INSTANCE, request, listener);
}
}

View File

@ -6,10 +6,16 @@
package org.elasticsearch.xpack.ccr.action;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -20,22 +26,46 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrRetentionLeases;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
public class TransportUnfollowAction extends TransportMasterNodeAction<UnfollowAction.Request, AcknowledgedResponse> {
private final Client client;
@Inject
public TransportUnfollowAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(UnfollowAction.NAME, transportService, clusterService, threadPool, actionFilters,
UnfollowAction.Request::new, indexNameExpressionResolver);
public TransportUnfollowAction(
final TransportService transportService,
final ClusterService clusterService,
final ThreadPool threadPool,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Client client) {
super(
UnfollowAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
UnfollowAction.Request::new,
indexNameExpressionResolver);
this.client = Objects.requireNonNull(client);
}
@Override
@ -49,26 +79,128 @@ public class TransportUnfollowAction extends TransportMasterNodeAction<UnfollowA
}
@Override
protected void masterOperation(UnfollowAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
protected void masterOperation(
final UnfollowAction.Request request,
final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
clusterService.submitStateUpdateTask("unfollow_action", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState current) throws Exception {
public ClusterState execute(final ClusterState current) {
String followerIndex = request.getFollowerIndex();
return unfollow(followerIndex, current);
}
@Override
public void onFailure(String source, Exception e) {
public void onFailure(final String source, final Exception e) {
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new AcknowledgedResponse(true));
public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
final IndexMetaData indexMetaData = oldState.metaData().index(request.getFollowerIndex());
final Map<String, String> ccrCustomMetaData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
final String remoteClusterName = ccrCustomMetaData.get(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY);
final Client remoteClient = client.getRemoteClusterClient(remoteClusterName);
final String leaderIndexName = ccrCustomMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
final String leaderIndexUuid = ccrCustomMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
final Index leaderIndex = new Index(leaderIndexName, leaderIndexUuid);
final String retentionLeaseId = CcrRetentionLeases.retentionLeaseId(
oldState.getClusterName().value(),
indexMetaData.getIndex(),
remoteClusterName,
leaderIndex);
final int numberOfShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexMetaData.getSettings());
final GroupedActionListener<RetentionLeaseActions.Response> groupListener = new GroupedActionListener<>(
new ActionListener<Collection<RetentionLeaseActions.Response>>() {
@Override
public void onResponse(final Collection<RetentionLeaseActions.Response> responses) {
logger.trace(
"[{}] removed retention lease [{}] on all leader primary shards",
indexMetaData.getIndex(),
retentionLeaseId);
listener.onResponse(new AcknowledgedResponse(true));
}
@Override
public void onFailure(final Exception e) {
logger.warn(new ParameterizedMessage(
"[{}] failure while removing retention lease [{}] on leader primary shards",
indexMetaData.getIndex(),
retentionLeaseId),
e);
final ElasticsearchException wrapper = new ElasticsearchException(e);
wrapper.addMetadata("es.failed_to_remove_retention_leases", retentionLeaseId);
listener.onFailure(wrapper);
}
},
numberOfShards,
Collections.emptyList());
for (int i = 0; i < numberOfShards; i++) {
final ShardId followerShardId = new ShardId(indexMetaData.getIndex(), i);
final ShardId leaderShardId = new ShardId(leaderIndex, i);
removeRetentionLeaseForShard(
followerShardId,
leaderShardId,
retentionLeaseId,
remoteClient,
ActionListener.wrap(
groupListener::onResponse,
e -> handleException(
followerShardId,
retentionLeaseId,
leaderShardId,
groupListener,
e)));
}
}
private void removeRetentionLeaseForShard(
final ShardId followerShardId,
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
logger.trace("{} removing retention lease [{}] while unfollowing leader index", followerShardId, retentionLeaseId);
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
// we have to execute under the system context so that if security is enabled the removal is authorized
threadContext.markAsSystemContext();
CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, listener);
}
}
private void handleException(
final ShardId followerShardId,
final String retentionLeaseId,
final ShardId leaderShardId,
final ActionListener<RetentionLeaseActions.Response> listener,
final Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
assert cause instanceof ElasticsearchSecurityException == false : e;
if (cause instanceof RetentionLeaseNotFoundException) {
// treat as success
logger.trace(new ParameterizedMessage(
"{} retention lease [{}] not found on {} while unfollowing",
followerShardId,
retentionLeaseId,
leaderShardId),
e);
listener.onResponse(new RetentionLeaseActions.Response());
} else {
logger.warn(new ParameterizedMessage(
"{} failed to remove retention lease [{}] on {} while unfollowing",
followerShardId,
retentionLeaseId,
leaderShardId),
e);
listener.onFailure(e);
}
}
});
}

View File

@ -44,7 +44,6 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.shard.IndexShard;
@ -70,6 +69,7 @@ import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrRetentionLeases;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.action.CcrRequests;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
@ -94,9 +94,10 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncAddRetentionLease;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncRenewRetentionLease;
/**
@ -321,12 +322,12 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
// schedule renewals to run during the restore
final Scheduler.Cancellable renewable = threadPool.scheduleWithFixedDelay(
() -> {
logger.trace("{} background renewal of retention lease [{}] during restore", shardId, retentionLeaseId);
logger.trace("{} background renewal of retention lease [{}] during restore", indexShard.shardId(), retentionLeaseId);
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the renewal is authorized
threadContext.markAsSystemContext();
asyncRenewRetentionLease(
CcrRetentionLeases.asyncRenewRetentionLease(
leaderShardId,
retentionLeaseId,
remoteClient,
@ -377,8 +378,9 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
final Client remoteClient) {
logger.trace(
() -> new ParameterizedMessage("{} requesting leader to add retention lease [{}]", shardId, retentionLeaseId));
final TimeValue timeout = ccrSettings.getRecoveryActionTimeout();
final Optional<RetentionLeaseAlreadyExistsException> maybeAddAlready =
syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient);
syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout);
maybeAddAlready.ifPresent(addAlready -> {
logger.trace(() -> new ParameterizedMessage(
"{} retention lease [{}] already exists, requesting a renewal",
@ -386,7 +388,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
retentionLeaseId),
addAlready);
final Optional<RetentionLeaseNotFoundException> maybeRenewNotFound =
syncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient);
syncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout);
maybeRenewNotFound.ifPresent(renewNotFound -> {
logger.trace(() -> new ParameterizedMessage(
"{} retention lease [{}] not found while attempting to renew, requesting a final add",
@ -394,7 +396,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
retentionLeaseId),
renewNotFound);
final Optional<RetentionLeaseAlreadyExistsException> maybeFallbackAddAlready =
syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient);
syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout);
maybeFallbackAddAlready.ifPresent(fallbackAddAlready -> {
/*
* At this point we tried to add the lease and the retention lease already existed. By the time we tried to renew the
@ -407,54 +409,6 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
});
}
private Optional<RetentionLeaseAlreadyExistsException> syncAddRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient) {
try {
final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
asyncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
response.actionGet(ccrSettings.getRecoveryActionTimeout());
return Optional.empty();
} catch (final RetentionLeaseAlreadyExistsException e) {
return Optional.of(e);
}
}
private void asyncAddRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
final RetentionLeaseActions.AddRequest request =
new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener);
}
private Optional<RetentionLeaseNotFoundException> syncRenewRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient) {
try {
final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
asyncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
response.actionGet(ccrSettings.getRecoveryActionTimeout());
return Optional.empty();
} catch (final RetentionLeaseNotFoundException e) {
return Optional.of(e);
}
}
private void asyncRenewRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
final RetentionLeaseActions.RenewRequest request =
new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener);
}
// this setting is intentionally not registered, it is only used in tests
public static final Setting<TimeValue> RETENTION_LEASE_RENEW_INTERVAL_SETTING =
Setting.timeSetting(

View File

@ -7,9 +7,12 @@
package org.elasticsearch.xpack.ccr;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
@ -29,15 +32,22 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
import java.io.IOException;
import java.util.ArrayList;
@ -45,17 +55,23 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
@ -148,26 +164,19 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
final PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
final ClusterStateResponse leaderIndexClusterState =
leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get();
final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID();
// ensure that a retention lease has been put in place on each shard
assertBusy(() -> {
final IndicesStatsResponse stats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardStats = getShardStats(stats);
final List<ShardStats> shardsStats = getShardsStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases();
final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID();
final RetentionLease retentionLease =
currentRetentionLeases.leases().iterator().next();
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID)));
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex)));
}
});
@ -215,7 +224,6 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
}
connection.sendRequest(requestId, action, request, options);
});
}
}
@ -223,10 +231,6 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
final PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
final ClusterStateResponse leaderIndexClusterState =
leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get();
final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID();
try {
// ensure that a retention lease has been put in place on each shard, and grab a copy of them
final List<RetentionLeases> retentionLeases = new ArrayList<>();
@ -236,16 +240,13 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardStats = getShardStats(stats);
final List<ShardStats> shardsStats = getShardsStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases();
final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID();
final RetentionLease retentionLease =
currentRetentionLeases.leases().iterator().next();
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID)));
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex)));
retentionLeases.add(currentRetentionLeases);
}
});
@ -256,16 +257,13 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardStats = getShardStats(stats);
final List<ShardStats> shardsStats = getShardsStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases();
final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID();
final RetentionLease retentionLease =
currentRetentionLeases.leases().iterator().next();
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID)));
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex)));
// we assert that retention leases are being renewed by an increase in the timestamp
assertThat(retentionLease.timestamp(), greaterThan(retentionLeases.get(i).leases().iterator().next().timestamp()));
}
@ -321,9 +319,9 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardStats = getShardStats(stats);
final List<ShardStats> shardsStats = getShardsStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases();
final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
@ -349,9 +347,9 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardStats = getShardStats(stats);
final List<ShardStats> shardsStats = getShardsStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases();
final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
@ -371,6 +369,170 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
}
}
public void testUnfollowRemovesRetentionLeases() throws Exception {
final String leaderIndex = "leader";
final String followerIndex = "follower";
final String leaderIndexSettings =
getIndexSettings(randomIntBetween(1, 4), 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
ensureFollowerGreen(true, followerIndex);
final String retentionLeaseId = getRetentionLeaseId(followerIndex, leaderIndex);
final IndicesStatsResponse stats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
final List<ShardStats> shardsStats = getShardsStats(stats);
for (final ShardStats shardStats : shardsStats) {
assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), hasSize(1));
assertThat(
shardStats.getRetentionLeaseStats().retentionLeases().leases().iterator().next().id(),
equalTo(retentionLeaseId));
}
// we will sometimes fake that some of the retention leases are already removed on the leader shard
final Set<Integer> shardIds =
new HashSet<>(randomSubsetOf(randomIntBetween(0, 4), IntStream.range(0, 4).boxed().collect(Collectors.toSet())));
final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
try {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
final ClusterStateResponse leaderClusterState =
leaderClient().admin().cluster().prepareState().clear().setNodes(true).get();
for (final ObjectCursor<DiscoveryNode> receiverNode : leaderClusterState.getState().nodes().getDataNodes().values()) {
final MockTransportService receiverTransportService =
(MockTransportService) getLeaderCluster().getInstance(TransportService.class, receiverNode.value.getName());
senderTransportService.addSendBehavior(receiverTransportService,
(connection, requestId, action, request, options) -> {
if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)) {
final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request;
if (shardIds.contains(removeRequest.getShardId().id())) {
final String primaryShardNodeId =
getLeaderCluster()
.clusterService()
.state()
.routingTable()
.index(leaderIndex)
.shard(removeRequest.getShardId().id())
.primaryShard()
.currentNodeId();
final String primaryShardNodeName =
getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName();
final IndexShard primary =
getLeaderCluster()
.getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(removeRequest.getShardId());
final CountDownLatch latch = new CountDownLatch(1);
primary.removeRetentionLease(
retentionLeaseId,
ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())));
try {
latch.await();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
fail(e.toString());
}
}
}
connection.sendRequest(requestId, action, request, options);
});
}
}
pauseFollow(followerIndex);
followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet();
assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet());
final IndicesStatsResponse afterUnfollowStats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
final List<ShardStats> afterUnfollowShardsStats = getShardsStats(afterUnfollowStats);
for (final ShardStats shardStats : afterUnfollowShardsStats) {
assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty());
}
} finally {
}
}
public void testUnfollowFailsToRemoveRetentionLeases() throws Exception {
final String leaderIndex = "leader";
final String followerIndex = "follower";
final int numberOfShards = randomIntBetween(1, 4);
final String leaderIndexSettings =
getIndexSettings(numberOfShards, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
ensureFollowerGreen(true, followerIndex);
pauseFollow(followerIndex);
followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet();
// we will disrupt requests to remove retention leases for these random shards
final Set<Integer> shardIds =
new HashSet<>(randomSubsetOf(randomIntBetween(1, 4), IntStream.range(0, 4).boxed().collect(Collectors.toSet())));
final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
try {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
final ClusterStateResponse leaderClusterState =
leaderClient().admin().cluster().prepareState().clear().setNodes(true).get();
for (final ObjectCursor<DiscoveryNode> receiverNode : leaderClusterState.getState().nodes().getDataNodes().values()) {
final MockTransportService receiverTransportService =
(MockTransportService) getLeaderCluster().getInstance(TransportService.class, receiverNode.value.getName());
senderTransportService.addSendBehavior(receiverTransportService,
(connection, requestId, action, request, options) -> {
if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)) {
final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request;
if (shardIds.contains(removeRequest.getShardId().id())) {
throw randomBoolean()
? new ConnectTransportException(receiverNode.value, "connection failed")
: new IndexShardClosedException(removeRequest.getShardId());
}
}
connection.sendRequest(requestId, action, request, options);
});
}
}
final ElasticsearchException e = expectThrows(
ElasticsearchException.class,
() -> followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet());
final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID();
final ClusterStateResponse leaderIndexClusterState =
leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get();
final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID();
assertThat(
e.getMetadata("es.failed_to_remove_retention_leases"),
contains(retentionLeaseId(
getFollowerCluster().getClusterName(),
new Index(followerIndex, followerUUID),
getLeaderCluster().getClusterName(),
new Index(leaderIndex, leaderUUID))));
} finally {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
senderTransportService.clearAllRules();
}
}
}
/**
* Extract the shard stats from an indices stats response, with the stats ordered by shard ID with primaries first. This is to have a
* consistent ordering when comparing two responses.
@ -378,7 +540,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
* @param stats the indices stats
* @return the shard stats in sorted order with (shard ID, primary) as the sort key
*/
private List<ShardStats> getShardStats(final IndicesStatsResponse stats) {
private List<ShardStats> getShardsStats(final IndicesStatsResponse stats) {
return Arrays.stream(stats.getShards())
.sorted((s, t) -> {
if (s.getShardRouting().shardId().id() == t.getShardRouting().shardId().id()) {
@ -390,6 +552,18 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
.collect(Collectors.toList());
}
private String getRetentionLeaseId(final String followerIndex, final String leaderIndex) {
final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID();
final ClusterStateResponse leaderIndexClusterState =
leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get();
final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID();
return getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID);
}
private String getRetentionLeaseId(String followerIndex, String followerUUID, String leaderIndex, String leaderUUID) {
return retentionLeaseId(
getFollowerCluster().getClusterName(),

View File

@ -12,13 +12,13 @@ import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
import java.util.Objects;

View File

@ -31,6 +31,7 @@ public final class SystemPrivilege extends Privilege {
RetentionLeaseSyncAction.ACTION_NAME + "*", // needed for retention lease syncs
RetentionLeaseBackgroundSyncAction.ACTION_NAME + "*", // needed for background retention lease syncs
RetentionLeaseActions.Add.ACTION_NAME + "*", // needed for CCR to add retention leases
RetentionLeaseActions.Remove.ACTION_NAME + "*", // needed for CCR to remove retention leases
RetentionLeaseActions.Renew.ACTION_NAME + "*", // needed for CCR to renew retention leases
"indices:admin/settings/update" // needed for DiskThresholdMonitor.markIndicesReadOnly
);

View File

@ -135,6 +135,8 @@ public class PrivilegeTests extends ESTestCase {
assertThat(predicate.test("indices:admin/seq_no/retention_lease_background_sync[r]"), is(true));
assertThat(predicate.test("indices:admin/seq_no/add_retention_lease"), is(true));
assertThat(predicate.test("indices:admin/seq_no/add_retention_lease[s]"), is(true));
assertThat(predicate.test("indices:admin/seq_no/remove_retention_lease"), is(true));
assertThat(predicate.test("indices:admin/seq_no/remove_retention_lease[s]"), is(true));
assertThat(predicate.test("indices:admin/seq_no/renew_retention_lease"), is(true));
assertThat(predicate.test("indices:admin/seq_no/renew_retention_lease[s]"), is(true));
assertThat(predicate.test("indices:admin/settings/update"), is(true));

View File

@ -326,9 +326,6 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
}
}
// Specifically, this is waiting for this bullet to be complete:
// - integrate shard history retention leases with cross-cluster replication
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37165")
public void testCannotShrinkLeaderIndex() throws Exception {
String indexName = "shrink-leader-test";
String shrunkenIndexName = "shrink-" + indexName;

View File

@ -262,6 +262,7 @@ public class AuthorizationServiceTests extends ESTestCase {
"indices:admin/seq_no/retention_lease_sync",
"indices:admin/seq_no/retention_lease_background_sync",
"indices:admin/seq_no/add_retention_lease",
"indices:admin/seq_no/remove_retention_lease",
"indices:admin/seq_no/renew_retention_lease",
"indices:admin/settings/update" };
for (String action : actions) {