From 09ea3ccd169db4e70743b53714f0b2e1b4a4d64e Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 20 Feb 2019 07:05:47 -0500 Subject: [PATCH] Remove retention leases when unfollowing (#39088) This commit attempts to remove the retention leases on the leader shards when unfollowing an index. This is best effort, since the leader might not be available. --- .../xpack/ccr/CcrRetentionLeases.java | 120 +++++++++ .../ccr/action/TransportUnfollowAction.java | 156 +++++++++++- .../xpack/ccr/repository/CcrRepository.java | 64 +---- .../xpack/ccr/CcrRetentionLeaseIT.java | 238 +++++++++++++++--- .../xpack/core/ccr/client/CcrClient.java | 8 +- .../authz/privilege/SystemPrivilege.java | 1 + .../authz/privilege/PrivilegeTests.java | 2 + .../indexlifecycle/CCRIndexLifecycleIT.java | 3 - .../authz/AuthorizationServiceTests.java | 1 + 9 files changed, 487 insertions(+), 106 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java index 122fbdb969a..6afef8c42aa 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java @@ -6,9 +6,20 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; +import org.elasticsearch.index.seqno.RetentionLeaseActions; +import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; +import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; +import org.elasticsearch.index.shard.ShardId; import java.util.Locale; +import java.util.Optional; + +import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; public class CcrRetentionLeases { @@ -37,4 +48,113 @@ public class CcrRetentionLeases { leaderIndex.getUUID()); } + /** + * Synchronously requests to add a retention lease with the specified retention lease ID on the specified leader shard using the given + * remote client. Note that this method will block up to the specified timeout. + * + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param remoteClient the remote client on which to execute this request + * @param timeout the timeout + * @return an optional exception indicating whether or not the retention lease already exists + */ + public static Optional syncAddRetentionLease( + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient, + final TimeValue timeout) { + try { + final PlainActionFuture response = new PlainActionFuture<>(); + asyncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response); + response.actionGet(timeout); + return Optional.empty(); + } catch (final RetentionLeaseAlreadyExistsException e) { + return Optional.of(e); + } + } + + /** + * Asynchronously requests to add a retention lease with the specified retention lease ID on the specified leader shard using the given + * remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a response + * or failure. + * + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param remoteClient the remote client on which to execute this request + * @param listener the listener + */ + public static void asyncAddRetentionLease( + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient, + final ActionListener listener) { + final RetentionLeaseActions.AddRequest request = + new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); + remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener); + } + + /** + * Synchronously requests to renew a retention lease with the specified retention lease ID on the specified leader shard using the given + * remote client. Note that this method will block up to the specified timeout. + * + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param remoteClient the remote client on which to execute this request + * @param timeout the timeout + * @return an optional exception indicating whether or not the retention lease already exists + */ + public static Optional syncRenewRetentionLease( + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient, + final TimeValue timeout) { + try { + final PlainActionFuture response = new PlainActionFuture<>(); + asyncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response); + response.actionGet(timeout); + return Optional.empty(); + } catch (final RetentionLeaseNotFoundException e) { + return Optional.of(e); + } + } + + /** + * Asynchronously requests to renew a retention lease with the specified retention lease ID on the specified leader shard using the + * given remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a + * response or failure. + * + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param remoteClient the remote client on which to execute this request + * @param listener the listener + */ + public static void asyncRenewRetentionLease( + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient, + final ActionListener listener) { + final RetentionLeaseActions.RenewRequest request = + new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); + remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener); + } + + /** + * Asynchronously requests to remove a retention lease with the specified retention lease ID on the specified leader shard using the + * given remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a + * response or failure. + * + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param remoteClient the remote client on which to execute this request + * @param listener the listener + */ + public static void asyncRemoveRetentionLease( + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient, + final ActionListener listener) { + final RetentionLeaseActions.RemoveRequest request = new RetentionLeaseActions.RemoveRequest(leaderShardId, retentionLeaseId); + remoteClient.execute(RetentionLeaseActions.Remove.INSTANCE, request, listener); + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java index 3a158aceddb..0e6b0ccceff 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -6,10 +6,16 @@ package org.elasticsearch.xpack.ccr.action; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -20,22 +26,46 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.seqno.RetentionLeaseActions; +import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.CcrRetentionLeases; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + public class TransportUnfollowAction extends TransportMasterNodeAction { + private final Client client; + @Inject - public TransportUnfollowAction(TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { - super(UnfollowAction.NAME, transportService, clusterService, threadPool, actionFilters, - UnfollowAction.Request::new, indexNameExpressionResolver); + public TransportUnfollowAction( + final TransportService transportService, + final ClusterService clusterService, + final ThreadPool threadPool, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver, + final Client client) { + super( + UnfollowAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + UnfollowAction.Request::new, + indexNameExpressionResolver); + this.client = Objects.requireNonNull(client); } @Override @@ -49,26 +79,128 @@ public class TransportUnfollowAction extends TransportMasterNodeAction listener) throws Exception { + protected void masterOperation( + final UnfollowAction.Request request, + final ClusterState state, + final ActionListener listener) { clusterService.submitStateUpdateTask("unfollow_action", new ClusterStateUpdateTask() { @Override - public ClusterState execute(ClusterState current) throws Exception { + public ClusterState execute(final ClusterState current) { String followerIndex = request.getFollowerIndex(); return unfollow(followerIndex, current); } @Override - public void onFailure(String source, Exception e) { + public void onFailure(final String source, final Exception e) { listener.onFailure(e); } @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new AcknowledgedResponse(true)); + public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { + final IndexMetaData indexMetaData = oldState.metaData().index(request.getFollowerIndex()); + final Map ccrCustomMetaData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); + final String remoteClusterName = ccrCustomMetaData.get(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY); + final Client remoteClient = client.getRemoteClusterClient(remoteClusterName); + final String leaderIndexName = ccrCustomMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY); + final String leaderIndexUuid = ccrCustomMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); + final Index leaderIndex = new Index(leaderIndexName, leaderIndexUuid); + final String retentionLeaseId = CcrRetentionLeases.retentionLeaseId( + oldState.getClusterName().value(), + indexMetaData.getIndex(), + remoteClusterName, + leaderIndex); + final int numberOfShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexMetaData.getSettings()); + + final GroupedActionListener groupListener = new GroupedActionListener<>( + new ActionListener>() { + + @Override + public void onResponse(final Collection responses) { + logger.trace( + "[{}] removed retention lease [{}] on all leader primary shards", + indexMetaData.getIndex(), + retentionLeaseId); + listener.onResponse(new AcknowledgedResponse(true)); + } + + @Override + public void onFailure(final Exception e) { + logger.warn(new ParameterizedMessage( + "[{}] failure while removing retention lease [{}] on leader primary shards", + indexMetaData.getIndex(), + retentionLeaseId), + e); + final ElasticsearchException wrapper = new ElasticsearchException(e); + wrapper.addMetadata("es.failed_to_remove_retention_leases", retentionLeaseId); + listener.onFailure(wrapper); + } + + }, + numberOfShards, + Collections.emptyList()); + for (int i = 0; i < numberOfShards; i++) { + final ShardId followerShardId = new ShardId(indexMetaData.getIndex(), i); + final ShardId leaderShardId = new ShardId(leaderIndex, i); + removeRetentionLeaseForShard( + followerShardId, + leaderShardId, + retentionLeaseId, + remoteClient, + ActionListener.wrap( + groupListener::onResponse, + e -> handleException( + followerShardId, + retentionLeaseId, + leaderShardId, + groupListener, + e))); + } } + + private void removeRetentionLeaseForShard( + final ShardId followerShardId, + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient, + final ActionListener listener) { + logger.trace("{} removing retention lease [{}] while unfollowing leader index", followerShardId, retentionLeaseId); + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { + // we have to execute under the system context so that if security is enabled the removal is authorized + threadContext.markAsSystemContext(); + CcrRetentionLeases.asyncRemoveRetentionLease(leaderShardId, retentionLeaseId, remoteClient, listener); + } + } + + private void handleException( + final ShardId followerShardId, + final String retentionLeaseId, + final ShardId leaderShardId, + final ActionListener listener, + final Exception e) { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + assert cause instanceof ElasticsearchSecurityException == false : e; + if (cause instanceof RetentionLeaseNotFoundException) { + // treat as success + logger.trace(new ParameterizedMessage( + "{} retention lease [{}] not found on {} while unfollowing", + followerShardId, + retentionLeaseId, + leaderShardId), + e); + listener.onResponse(new RetentionLeaseActions.Response()); + } else { + logger.warn(new ParameterizedMessage( + "{} failed to remove retention lease [{}] on {} while unfollowing", + followerShardId, + retentionLeaseId, + leaderShardId), + e); + listener.onFailure(e); + } + } + }); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 41cce3f5b0b..8d07d05cdb9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -44,7 +44,6 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.seqno.LocalCheckpointTracker; -import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; import org.elasticsearch.index.shard.IndexShard; @@ -70,6 +69,7 @@ import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; +import org.elasticsearch.xpack.ccr.CcrRetentionLeases; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.CcrRequests; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; @@ -94,9 +94,10 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongConsumer; import java.util.function.Supplier; -import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId; +import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncAddRetentionLease; +import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncRenewRetentionLease; /** @@ -321,12 +322,12 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit // schedule renewals to run during the restore final Scheduler.Cancellable renewable = threadPool.scheduleWithFixedDelay( () -> { - logger.trace("{} background renewal of retention lease [{}] during restore", shardId, retentionLeaseId); + logger.trace("{} background renewal of retention lease [{}] during restore", indexShard.shardId(), retentionLeaseId); final ThreadContext threadContext = threadPool.getThreadContext(); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { // we have to execute under the system context so that if security is enabled the renewal is authorized threadContext.markAsSystemContext(); - asyncRenewRetentionLease( + CcrRetentionLeases.asyncRenewRetentionLease( leaderShardId, retentionLeaseId, remoteClient, @@ -377,8 +378,9 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit final Client remoteClient) { logger.trace( () -> new ParameterizedMessage("{} requesting leader to add retention lease [{}]", shardId, retentionLeaseId)); + final TimeValue timeout = ccrSettings.getRecoveryActionTimeout(); final Optional maybeAddAlready = - syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient); + syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout); maybeAddAlready.ifPresent(addAlready -> { logger.trace(() -> new ParameterizedMessage( "{} retention lease [{}] already exists, requesting a renewal", @@ -386,7 +388,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit retentionLeaseId), addAlready); final Optional maybeRenewNotFound = - syncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient); + syncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout); maybeRenewNotFound.ifPresent(renewNotFound -> { logger.trace(() -> new ParameterizedMessage( "{} retention lease [{}] not found while attempting to renew, requesting a final add", @@ -394,7 +396,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit retentionLeaseId), renewNotFound); final Optional maybeFallbackAddAlready = - syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient); + syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout); maybeFallbackAddAlready.ifPresent(fallbackAddAlready -> { /* * At this point we tried to add the lease and the retention lease already existed. By the time we tried to renew the @@ -407,54 +409,6 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit }); } - private Optional syncAddRetentionLease( - final ShardId leaderShardId, - final String retentionLeaseId, - final Client remoteClient) { - try { - final PlainActionFuture response = new PlainActionFuture<>(); - asyncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response); - response.actionGet(ccrSettings.getRecoveryActionTimeout()); - return Optional.empty(); - } catch (final RetentionLeaseAlreadyExistsException e) { - return Optional.of(e); - } - } - - private void asyncAddRetentionLease( - final ShardId leaderShardId, - final String retentionLeaseId, - final Client remoteClient, - final ActionListener listener) { - final RetentionLeaseActions.AddRequest request = - new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); - remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener); - } - - private Optional syncRenewRetentionLease( - final ShardId leaderShardId, - final String retentionLeaseId, - final Client remoteClient) { - try { - final PlainActionFuture response = new PlainActionFuture<>(); - asyncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response); - response.actionGet(ccrSettings.getRecoveryActionTimeout()); - return Optional.empty(); - } catch (final RetentionLeaseNotFoundException e) { - return Optional.of(e); - } - } - - private void asyncRenewRetentionLease( - final ShardId leaderShardId, - final String retentionLeaseId, - final Client remoteClient, - final ActionListener listener) { - final RetentionLeaseActions.RenewRequest request = - new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); - remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener); - } - // this setting is intentionally not registered, it is only used in tests public static final Setting RETENTION_LEASE_RENEW_INTERVAL_SETTING = Setting.timeSetting( diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index c42887e6b52..29430ff6d25 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -7,9 +7,12 @@ package org.elasticsearch.xpack.ccr; import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; @@ -29,15 +32,22 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.index.seqno.RetentionLeases; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardClosedException; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.repository.CcrRepository; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.io.IOException; import java.util.ArrayList; @@ -45,17 +55,23 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; +import static java.util.Collections.singletonMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId; import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; @@ -148,26 +164,19 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase { final PlainActionFuture future = PlainActionFuture.newFuture(); restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); - final ClusterStateResponse leaderIndexClusterState = - leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get(); - final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID(); - // ensure that a retention lease has been put in place on each shard assertBusy(() -> { final IndicesStatsResponse stats = leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); - final List shardStats = getShardStats(stats); + final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { - final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases(); + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); assertThat(currentRetentionLeases.leases(), hasSize(1)); - final ClusterStateResponse followerIndexClusterState = - followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); - final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); final RetentionLease retentionLease = currentRetentionLeases.leases().iterator().next(); - assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID))); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); } }); @@ -215,7 +224,6 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase { } connection.sendRequest(requestId, action, request, options); }); - } } @@ -223,10 +231,6 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase { final PlainActionFuture future = PlainActionFuture.newFuture(); restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); - final ClusterStateResponse leaderIndexClusterState = - leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get(); - final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID(); - try { // ensure that a retention lease has been put in place on each shard, and grab a copy of them final List retentionLeases = new ArrayList<>(); @@ -236,16 +240,13 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase { leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); - final List shardStats = getShardStats(stats); + final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { - final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases(); + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); assertThat(currentRetentionLeases.leases(), hasSize(1)); - final ClusterStateResponse followerIndexClusterState = - followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); - final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); final RetentionLease retentionLease = currentRetentionLeases.leases().iterator().next(); - assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID))); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); retentionLeases.add(currentRetentionLeases); } }); @@ -256,16 +257,13 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase { leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); - final List shardStats = getShardStats(stats); + final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { - final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases(); + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); assertThat(currentRetentionLeases.leases(), hasSize(1)); - final ClusterStateResponse followerIndexClusterState = - followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); - final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); final RetentionLease retentionLease = currentRetentionLeases.leases().iterator().next(); - assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID))); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); // we assert that retention leases are being renewed by an increase in the timestamp assertThat(retentionLease.timestamp(), greaterThan(retentionLeases.get(i).leases().iterator().next().timestamp())); } @@ -321,9 +319,9 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase { leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); - final List shardStats = getShardStats(stats); + final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { - final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases(); + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); assertThat(currentRetentionLeases.leases(), hasSize(1)); final ClusterStateResponse followerIndexClusterState = followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); @@ -349,9 +347,9 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase { leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); assertNotNull(stats.getShards()); assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); - final List shardStats = getShardStats(stats); + final List shardsStats = getShardsStats(stats); for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { - final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases(); + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); assertThat(currentRetentionLeases.leases(), hasSize(1)); final ClusterStateResponse followerIndexClusterState = followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); @@ -371,6 +369,170 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase { } } + public void testUnfollowRemovesRetentionLeases() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final String leaderIndexSettings = + getIndexSettings(randomIntBetween(1, 4), 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + final String retentionLeaseId = getRetentionLeaseId(followerIndex, leaderIndex); + + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + final List shardsStats = getShardsStats(stats); + for (final ShardStats shardStats : shardsStats) { + assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), hasSize(1)); + assertThat( + shardStats.getRetentionLeaseStats().retentionLeases().leases().iterator().next().id(), + equalTo(retentionLeaseId)); + } + + // we will sometimes fake that some of the retention leases are already removed on the leader shard + final Set shardIds = + new HashSet<>(randomSubsetOf(randomIntBetween(0, 4), IntStream.range(0, 4).boxed().collect(Collectors.toSet()))); + + final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); + try { + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + final ClusterStateResponse leaderClusterState = + leaderClient().admin().cluster().prepareState().clear().setNodes(true).get(); + for (final ObjectCursor receiverNode : leaderClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService receiverTransportService = + (MockTransportService) getLeaderCluster().getInstance(TransportService.class, receiverNode.value.getName()); + senderTransportService.addSendBehavior(receiverTransportService, + (connection, requestId, action, request, options) -> { + if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)) { + final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request; + if (shardIds.contains(removeRequest.getShardId().id())) { + final String primaryShardNodeId = + getLeaderCluster() + .clusterService() + .state() + .routingTable() + .index(leaderIndex) + .shard(removeRequest.getShardId().id()) + .primaryShard() + .currentNodeId(); + final String primaryShardNodeName = + getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = + getLeaderCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(removeRequest.getShardId()); + final CountDownLatch latch = new CountDownLatch(1); + primary.removeRetentionLease( + retentionLeaseId, + ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()))); + try { + latch.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + fail(e.toString()); + } + } + } + connection.sendRequest(requestId, action, request, options); + }); + } + + } + + + pauseFollow(followerIndex); + followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet(); + assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet()); + + final IndicesStatsResponse afterUnfollowStats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + final List afterUnfollowShardsStats = getShardsStats(afterUnfollowStats); + for (final ShardStats shardStats : afterUnfollowShardsStats) { + assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty()); + } + } finally { + + } + } + + public void testUnfollowFailsToRemoveRetentionLeases() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = randomIntBetween(1, 4); + final String leaderIndexSettings = + getIndexSettings(numberOfShards, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + pauseFollow(followerIndex); + followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet(); + + // we will disrupt requests to remove retention leases for these random shards + final Set shardIds = + new HashSet<>(randomSubsetOf(randomIntBetween(1, 4), IntStream.range(0, 4).boxed().collect(Collectors.toSet()))); + + final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); + try { + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + final ClusterStateResponse leaderClusterState = + leaderClient().admin().cluster().prepareState().clear().setNodes(true).get(); + for (final ObjectCursor receiverNode : leaderClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService receiverTransportService = + (MockTransportService) getLeaderCluster().getInstance(TransportService.class, receiverNode.value.getName()); + senderTransportService.addSendBehavior(receiverTransportService, + (connection, requestId, action, request, options) -> { + if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)) { + final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request; + if (shardIds.contains(removeRequest.getShardId().id())) { + throw randomBoolean() + ? new ConnectTransportException(receiverNode.value, "connection failed") + : new IndexShardClosedException(removeRequest.getShardId()); + } + } + connection.sendRequest(requestId, action, request, options); + }); + } + + } + + final ElasticsearchException e = expectThrows( + ElasticsearchException.class, + () -> followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet()); + + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); + + final ClusterStateResponse leaderIndexClusterState = + leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get(); + final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID(); + + assertThat( + e.getMetadata("es.failed_to_remove_retention_leases"), + contains(retentionLeaseId( + getFollowerCluster().getClusterName(), + new Index(followerIndex, followerUUID), + getLeaderCluster().getClusterName(), + new Index(leaderIndex, leaderUUID)))); + } finally { + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + senderTransportService.clearAllRules(); + } + } + } + /** * Extract the shard stats from an indices stats response, with the stats ordered by shard ID with primaries first. This is to have a * consistent ordering when comparing two responses. @@ -378,7 +540,7 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase { * @param stats the indices stats * @return the shard stats in sorted order with (shard ID, primary) as the sort key */ - private List getShardStats(final IndicesStatsResponse stats) { + private List getShardsStats(final IndicesStatsResponse stats) { return Arrays.stream(stats.getShards()) .sorted((s, t) -> { if (s.getShardRouting().shardId().id() == t.getShardRouting().shardId().id()) { @@ -390,6 +552,18 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase { .collect(Collectors.toList()); } + private String getRetentionLeaseId(final String followerIndex, final String leaderIndex) { + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); + + final ClusterStateResponse leaderIndexClusterState = + leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get(); + final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID(); + + return getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID); + } + private String getRetentionLeaseId(String followerIndex, String followerUUID, String leaderIndex, String leaderUUID) { return retentionLeaseId( getFollowerCluster().getClusterName(), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java index 43305b030be..f35a1431433 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java @@ -12,13 +12,13 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; -import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; -import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; -import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; +import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; -import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.util.Objects; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java index 523a8101749..dda81e6b861 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java @@ -31,6 +31,7 @@ public final class SystemPrivilege extends Privilege { RetentionLeaseSyncAction.ACTION_NAME + "*", // needed for retention lease syncs RetentionLeaseBackgroundSyncAction.ACTION_NAME + "*", // needed for background retention lease syncs RetentionLeaseActions.Add.ACTION_NAME + "*", // needed for CCR to add retention leases + RetentionLeaseActions.Remove.ACTION_NAME + "*", // needed for CCR to remove retention leases RetentionLeaseActions.Renew.ACTION_NAME + "*", // needed for CCR to renew retention leases "indices:admin/settings/update" // needed for DiskThresholdMonitor.markIndicesReadOnly ); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java index 46db9e83f77..4af7dd2e57d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java @@ -135,6 +135,8 @@ public class PrivilegeTests extends ESTestCase { assertThat(predicate.test("indices:admin/seq_no/retention_lease_background_sync[r]"), is(true)); assertThat(predicate.test("indices:admin/seq_no/add_retention_lease"), is(true)); assertThat(predicate.test("indices:admin/seq_no/add_retention_lease[s]"), is(true)); + assertThat(predicate.test("indices:admin/seq_no/remove_retention_lease"), is(true)); + assertThat(predicate.test("indices:admin/seq_no/remove_retention_lease[s]"), is(true)); assertThat(predicate.test("indices:admin/seq_no/renew_retention_lease"), is(true)); assertThat(predicate.test("indices:admin/seq_no/renew_retention_lease[s]"), is(true)); assertThat(predicate.test("indices:admin/settings/update"), is(true)); diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java index b3c93acb97b..eaabab6d033 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java @@ -326,9 +326,6 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase { } } - // Specifically, this is waiting for this bullet to be complete: - // - integrate shard history retention leases with cross-cluster replication - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37165") public void testCannotShrinkLeaderIndex() throws Exception { String indexName = "shrink-leader-test"; String shrunkenIndexName = "shrink-" + indexName; diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java index bde5949d378..b608a569497 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java @@ -262,6 +262,7 @@ public class AuthorizationServiceTests extends ESTestCase { "indices:admin/seq_no/retention_lease_sync", "indices:admin/seq_no/retention_lease_background_sync", "indices:admin/seq_no/add_retention_lease", + "indices:admin/seq_no/remove_retention_lease", "indices:admin/seq_no/renew_retention_lease", "indices:admin/settings/update" }; for (String action : actions) {