diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java index f35e4906131..6fa1fd7fb3f 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java @@ -115,7 +115,7 @@ public class RetentionLeaseActions { } @Override - protected Response shardOperation(final T request, final ShardId shardId) throws IOException { + protected Response shardOperation(final T request, final ShardId shardId) { throw new UnsupportedOperationException(); } @@ -136,10 +136,10 @@ public class RetentionLeaseActions { public static class Add extends Action { public static final Add INSTANCE = new Add(); - public static final String NAME = "indices:admin/seq_no/add_retention_lease"; + public static final String ACTION_NAME = "indices:admin/seq_no/add_retention_lease"; private Add() { - super(NAME); + super(ACTION_NAME); } public static class TransportAction extends TransportRetentionLeaseAction { @@ -153,7 +153,7 @@ public class RetentionLeaseActions { final IndexNameExpressionResolver indexNameExpressionResolver, final IndicesService indicesService) { super( - NAME, + ACTION_NAME, threadPool, clusterService, transportService, @@ -186,10 +186,10 @@ public class RetentionLeaseActions { public static class Renew extends Action { public static final Renew INSTANCE = new Renew(); - public static final String NAME = "indices:admin/seq_no/renew_retention_lease"; + public static final String ACTION_NAME = "indices:admin/seq_no/renew_retention_lease"; private Renew() { - super(NAME); + super(ACTION_NAME); } public static class TransportAction extends TransportRetentionLeaseAction { @@ -203,7 +203,7 @@ public class RetentionLeaseActions { final IndexNameExpressionResolver indexNameExpressionResolver, final IndicesService indicesService) { super( - NAME, + ACTION_NAME, threadPool, clusterService, transportService, @@ -232,10 +232,10 @@ public class RetentionLeaseActions { public static class Remove extends Action { public static final Remove INSTANCE = new Remove(); - public static final String NAME = "indices:admin/seq_no/remove_retention_lease"; + public static final String ACTION_NAME = "indices:admin/seq_no/remove_retention_lease"; private Remove() { - super(NAME); + super(ACTION_NAME); } public static class TransportAction extends TransportRetentionLeaseAction { @@ -249,7 +249,7 @@ public class RetentionLeaseActions { final IndexNameExpressionResolver indexNameExpressionResolver, final IndicesService indicesService) { super( - NAME, + ACTION_NAME, threadPool, clusterService, transportService, diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseAlreadyExistsException.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseAlreadyExistsException.java index aaa41a7b400..ffd5e96e6a5 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseAlreadyExistsException.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseAlreadyExistsException.java @@ -27,7 +27,7 @@ import java.util.Objects; public class RetentionLeaseAlreadyExistsException extends ResourceAlreadyExistsException { - RetentionLeaseAlreadyExistsException(final String id) { + public RetentionLeaseAlreadyExistsException(final String id) { super("retention lease with ID [" + Objects.requireNonNull(id) + "] already exists"); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseNotFoundException.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseNotFoundException.java index d975077327f..2b13ae6b448 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseNotFoundException.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseNotFoundException.java @@ -27,7 +27,7 @@ import java.util.Objects; public class RetentionLeaseNotFoundException extends ResourceNotFoundException { - RetentionLeaseNotFoundException(final String id) { + public RetentionLeaseNotFoundException(final String id) { super("retention lease with ID [" + Objects.requireNonNull(id) + "] not found"); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java new file mode 100644 index 00000000000..122fbdb969a --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.index.Index; + +import java.util.Locale; + +public class CcrRetentionLeases { + + /** + * The retention lease ID used by followers. + * + * @param localClusterName the local cluster name + * @param followerIndex the follower index + * @param remoteClusterAlias the remote cluster alias + * @param leaderIndex the leader index + * @return the retention lease ID + */ + public static String retentionLeaseId( + final String localClusterName, + final Index followerIndex, + final String remoteClusterAlias, + final Index leaderIndex) { + return String.format( + Locale.ROOT, + "%s/%s/%s-following-%s/%s/%s", + localClusterName, + followerIndex.getName(), + followerIndex.getUUID(), + remoteClusterAlias, + leaderIndex.getName(), + leaderIndex.getUUID()); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 88f4e974bea..41cce3f5b0b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -8,8 +8,12 @@ package org.elasticsearch.xpack.ccr.repository; import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.IndexCommit; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -19,6 +23,7 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.support.ListenerTimeouts; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -30,13 +35,18 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.seqno.LocalCheckpointTracker; +import org.elasticsearch.index.seqno.RetentionLeaseActions; +import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; +import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRecoveryException; import org.elasticsearch.index.shard.ShardId; @@ -56,6 +66,7 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotShardFailure; import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; @@ -76,12 +87,16 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongConsumer; import java.util.function.Supplier; +import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId; /** @@ -90,6 +105,8 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; */ public class CcrRepository extends AbstractLifecycleComponent implements Repository { + private static final Logger logger = LogManager.getLogger(CcrRepository.class); + public static final String LATEST = "_latest_"; public static final String TYPE = "_ccr_"; public static final String NAME_PREFIX = "_ccr_"; @@ -98,6 +115,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit private final RepositoryMetaData metadata; private final CcrSettings ccrSettings; + private final String localClusterName; private final String remoteClusterAlias; private final Client client; private final CcrLicenseChecker ccrLicenseChecker; @@ -109,6 +127,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit CcrSettings ccrSettings, ThreadPool threadPool) { this.metadata = metadata; this.ccrSettings = ccrSettings; + this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value(); assert metadata.name().startsWith(NAME_PREFIX) : "CcrRepository metadata.name() must start with: " + NAME_PREFIX; this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1]; this.ccrLicenseChecker = ccrLicenseChecker; @@ -136,10 +155,14 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit return metadata; } + private Client getRemoteClusterClient() { + return client.getRemoteClusterClient(remoteClusterAlias); + } + @Override public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId"; - Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); + Client remoteClient = getRemoteClusterClient(); ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).setNodes(true) .get(ccrSettings.getRecoveryActionTimeout()); ImmutableOpenMap indicesMap = response.getState().metaData().indices(); @@ -152,7 +175,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit @Override public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId"; - Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); + Client remoteClient = getRemoteClusterClient(); // We set a single dummy index name to avoid fetching all the index data ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name"); ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest) @@ -164,7 +187,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException { assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId"; String leaderIndex = index.getName(); - Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); + Client remoteClient = getRemoteClusterClient(); ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex); ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest) @@ -203,7 +226,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit @Override public RepositoryData getRepositoryData() { - Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); + Client remoteClient = getRemoteClusterClient(); ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true) .get(ccrSettings.getRecoveryActionTimeout()); MetaData remoteMetaData = response.getState().getMetaData(); @@ -280,33 +303,167 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId, RecoveryState recoveryState) { // TODO: Add timeouts to network calls / the restore process. - final Store store = indexShard.store(); - store.incRef(); - try { - store.createEmpty(indexShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion); - } catch (EngineException | IOException e) { - throw new IndexShardRecoveryException(shardId, "failed to create empty store", e); - } finally { - store.decRef(); - } + createEmptyStore(indexShard, shardId); - Map ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); - String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); - Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID); - ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId()); + final Map ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); + final String leaderIndexName = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY); + final String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); + final Index leaderIndex = new Index(leaderIndexName, leaderUUID); + final ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId()); + + final Client remoteClient = getRemoteClusterClient(); + + final String retentionLeaseId = + retentionLeaseId(localClusterName, indexShard.shardId().getIndex(), remoteClusterAlias, leaderIndex); + + acquireRetentionLeaseOnLeader(shardId, retentionLeaseId, leaderShardId, remoteClient); + + // schedule renewals to run during the restore + final Scheduler.Cancellable renewable = threadPool.scheduleWithFixedDelay( + () -> { + logger.trace("{} background renewal of retention lease [{}] during restore", shardId, retentionLeaseId); + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we have to execute under the system context so that if security is enabled the renewal is authorized + threadContext.markAsSystemContext(); + asyncRenewRetentionLease( + leaderShardId, + retentionLeaseId, + remoteClient, + ActionListener.wrap( + r -> {}, + e -> { + assert e instanceof ElasticsearchSecurityException == false : e; + logger.warn(new ParameterizedMessage( + "{} background renewal of retention lease [{}] failed during restore", + shardId, + retentionLeaseId), + e); + })); + } + }, + RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(indexShard.indexSettings().getSettings()), + Ccr.CCR_THREAD_POOL_NAME); - Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); // TODO: There should be some local timeout. And if the remote cluster returns an unknown session // response, we should be able to retry by creating a new session. - String name = metadata.name(); - try (RestoreSession restoreSession = openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) { + try (RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, indexShard, recoveryState)) { restoreSession.restoreFiles(); updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, indexShard.routingEntry().index()); } catch (Exception e) { throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e); + } finally { + logger.trace("{} canceling background renewal of retention lease [{}] at the end of restore", shardId, retentionLeaseId); + renewable.cancel(); } } + private void createEmptyStore(final IndexShard indexShard, final ShardId shardId) { + final Store store = indexShard.store(); + store.incRef(); + try { + store.createEmpty(indexShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion); + } catch (final EngineException | IOException e) { + throw new IndexShardRecoveryException(shardId, "failed to create empty store", e); + } finally { + store.decRef(); + } + } + + void acquireRetentionLeaseOnLeader( + final ShardId shardId, + final String retentionLeaseId, + final ShardId leaderShardId, + final Client remoteClient) { + logger.trace( + () -> new ParameterizedMessage("{} requesting leader to add retention lease [{}]", shardId, retentionLeaseId)); + final Optional maybeAddAlready = + syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient); + maybeAddAlready.ifPresent(addAlready -> { + logger.trace(() -> new ParameterizedMessage( + "{} retention lease [{}] already exists, requesting a renewal", + shardId, + retentionLeaseId), + addAlready); + final Optional maybeRenewNotFound = + syncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient); + maybeRenewNotFound.ifPresent(renewNotFound -> { + logger.trace(() -> new ParameterizedMessage( + "{} retention lease [{}] not found while attempting to renew, requesting a final add", + shardId, + retentionLeaseId), + renewNotFound); + final Optional maybeFallbackAddAlready = + syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient); + maybeFallbackAddAlready.ifPresent(fallbackAddAlready -> { + /* + * At this point we tried to add the lease and the retention lease already existed. By the time we tried to renew the + * lease, it expired or was removed. We tried to add the lease again and it already exists? Bail. + */ + assert false : fallbackAddAlready; + throw fallbackAddAlready; + }); + }); + }); + } + + private Optional syncAddRetentionLease( + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient) { + try { + final PlainActionFuture response = new PlainActionFuture<>(); + asyncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response); + response.actionGet(ccrSettings.getRecoveryActionTimeout()); + return Optional.empty(); + } catch (final RetentionLeaseAlreadyExistsException e) { + return Optional.of(e); + } + } + + private void asyncAddRetentionLease( + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient, + final ActionListener listener) { + final RetentionLeaseActions.AddRequest request = + new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); + remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener); + } + + private Optional syncRenewRetentionLease( + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient) { + try { + final PlainActionFuture response = new PlainActionFuture<>(); + asyncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response); + response.actionGet(ccrSettings.getRecoveryActionTimeout()); + return Optional.empty(); + } catch (final RetentionLeaseNotFoundException e) { + return Optional.of(e); + } + } + + private void asyncRenewRetentionLease( + final ShardId leaderShardId, + final String retentionLeaseId, + final Client remoteClient, + final ActionListener listener) { + final RetentionLeaseActions.RenewRequest request = + new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); + remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener); + } + + // this setting is intentionally not registered, it is only used in tests + public static final Setting RETENTION_LEASE_RENEW_INTERVAL_SETTING = + Setting.timeSetting( + "index.ccr.retention_lease.renew_interval", + new TimeValue(5, TimeUnit.MINUTES), + new TimeValue(0, TimeUnit.MILLISECONDS), + Setting.Property.Dynamic, + Setting.Property.IndexScope); + @Override public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); @@ -330,7 +487,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit } } - private RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard, + RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard, RecoveryState recoveryState) { String sessionUUID = UUIDs.randomBase64UUID(); PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 561b76e8315..3ffacd81ad2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction; @@ -22,8 +23,11 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.analysis.common.CommonAnalysisPlugin; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -35,6 +39,7 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -59,6 +64,9 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.snapshots.RestoreInfo; +import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; @@ -101,10 +109,12 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING; import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING; +import static org.elasticsearch.snapshots.RestoreService.restoreInProgress; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.empty; @@ -116,6 +126,10 @@ public abstract class CcrIntegTestCase extends ESTestCase { private static ClusterGroup clusterGroup; + protected Collection> nodePlugins() { + return Collections.emptyList(); + } + @Before public final void startClusters() throws Exception { if (clusterGroup != null && reuseClusters()) { @@ -226,7 +240,10 @@ public abstract class CcrIntegTestCase extends ESTestCase { @Override public Collection> nodePlugins() { - return Arrays.asList(LocalStateCcr.class, CommonAnalysisPlugin.class); + return Stream.concat( + Stream.of(LocalStateCcr.class, CommonAnalysisPlugin.class), + CcrIntegTestCase.this.nodePlugins().stream()) + .collect(Collectors.toList()); } @Override @@ -642,6 +659,61 @@ public abstract class CcrIntegTestCase extends ESTestCase { return lastKnownCount.get(); } + protected ActionListener waitForRestore( + final ClusterService clusterService, + final ActionListener listener) { + return new ActionListener() { + + @Override + public void onResponse(RestoreService.RestoreCompletionResponse restoreCompletionResponse) { + if (restoreCompletionResponse.getRestoreInfo() == null) { + final Snapshot snapshot = restoreCompletionResponse.getSnapshot(); + final String uuid = restoreCompletionResponse.getUuid(); + + final ClusterStateListener clusterStateListener = new ClusterStateListener() { + + @Override + public void clusterChanged(ClusterChangedEvent changedEvent) { + final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid); + final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid); + if (prevEntry == null) { + /* + * When there is a master failure after a restore has been started, this listener might not be registered + * on the current master and as such it might miss some intermediary cluster states due to batching. + * Clean up the listener in that case and acknowledge completion of restore operation to client. + */ + clusterService.removeListener(this); + listener.onResponse(null); + } else if (newEntry == null) { + clusterService.removeListener(this); + ImmutableOpenMap shards = prevEntry.shards(); + RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(), + prevEntry.indices(), + shards.size(), + shards.size() - RestoreService.failedShards(shards)); + logger.debug("restore of [{}] completed", snapshot); + listener.onResponse(ri); + } else { + // restore not completed yet, wait for next cluster state update + } + } + + }; + + clusterService.addListener(clusterStateListener); + } else { + listener.onResponse(restoreCompletionResponse.getRestoreInfo()); + } + } + + @Override + public void onFailure(Exception t) { + listener.onFailure(t); + } + + }; + } + static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception { CountDownLatch latch = new CountDownLatch(1); clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 45adec46a21..5357f9f01b2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -16,26 +16,20 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreService; -import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportService; @@ -55,7 +49,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.singletonMap; -import static org.elasticsearch.snapshots.RestoreService.restoreInProgress; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -453,51 +446,4 @@ public class CcrRepositoryIT extends CcrIntegTestCase { assertThat(getResponse.getSource().get("f"), equalTo(value)); } - private ActionListener waitForRestore(ClusterService clusterService, - ActionListener listener) { - return new ActionListener() { - @Override - public void onResponse(RestoreService.RestoreCompletionResponse restoreCompletionResponse) { - if (restoreCompletionResponse.getRestoreInfo() == null) { - final Snapshot snapshot = restoreCompletionResponse.getSnapshot(); - final String uuid = restoreCompletionResponse.getUuid(); - - ClusterStateListener clusterStateListener = new ClusterStateListener() { - @Override - public void clusterChanged(ClusterChangedEvent changedEvent) { - final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid); - final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid); - if (prevEntry == null) { - // When there is a master failure after a restore has been started, this listener might not be registered - // on the current master and as such it might miss some intermediary cluster states due to batching. - // Clean up listener in that case and acknowledge completion of restore operation to client. - clusterService.removeListener(this); - listener.onResponse(null); - } else if (newEntry == null) { - clusterService.removeListener(this); - ImmutableOpenMap shards = prevEntry.shards(); - RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(), - prevEntry.indices(), - shards.size(), - shards.size() - RestoreService.failedShards(shards)); - logger.debug("restore of [{}] completed", snapshot); - listener.onResponse(ri); - } else { - // restore not completed yet, wait for next cluster state update - } - } - }; - - clusterService.addListener(clusterStateListener); - } else { - listener.onResponse(restoreCompletionResponse.getRestoreInfo()); - } - } - - @Override - public void onFailure(Exception t) { - listener.onFailure(t); - } - }; - } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java new file mode 100644 index 00000000000..bbf6b69a081 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -0,0 +1,407 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.RetentionLeases; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.snapshots.RestoreInfo; +import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; +import org.elasticsearch.xpack.ccr.repository.CcrRepository; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId; +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + +public class CcrRetentionLeaseIT extends CcrIntegTestCase { + + public static final class RetentionLeaseRenewIntervalSettingPlugin extends Plugin { + + @Override + public List> getSettings() { + return Collections.singletonList(CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING); + } + + } + + public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin { + + @Override + public List> getSettings() { + return Collections.singletonList(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING); + } + + } + + @Override + protected Collection> nodePlugins() { + return Stream.concat( + super.nodePlugins().stream(), + Stream.of(RetentionLeaseRenewIntervalSettingPlugin.class, RetentionLeaseSyncIntervalSettingPlugin.class)) + .collect(Collectors.toList()); + } + + private final IndicesOptions indicesOptions = IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + + private RestoreSnapshotRequest setUpRestoreSnapshotRequest( + final String leaderIndex, + final int numberOfShards, + final int numberOfReplicas, + final String followerIndex, + final int numberOfDocuments) throws IOException { + final ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + final String chunkSize = new ByteSizeValue(randomFrom(4, 128, 1024), ByteSizeUnit.KB).getStringRep(); + settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), chunkSize)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + + final String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; + + final Map additionalSettings = new HashMap<>(); + additionalSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"); + additionalSettings.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200).getStringRep()); + final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderGreen(leaderIndex); + + logger.info("indexing [{}] docs", numberOfDocuments); + for (int i = 0; i < numberOfDocuments; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex(leaderIndex, "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + if (rarely()) { + leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); + } + } + + leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); + + final Settings.Builder settingsBuilder = Settings.builder() + .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) + .put(CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200)) + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + return new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST) + .indexSettings(settingsBuilder) + .indices(leaderIndex) + .indicesOptions(indicesOptions) + .renamePattern("^(.*)$") + .renameReplacement(followerIndex) + .masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS)); + } + + public void testRetentionLeaseIsTakenAtTheStartOfRecovery() throws Exception { + final String leaderIndex = "leader"; + final int numberOfShards = randomIntBetween(1, 3); + final int numberOfReplicas = between(0, 1); + final String followerIndex = "follower"; + final int numberOfDocuments = scaledRandomIntBetween(1, 8192); + final RestoreSnapshotRequest restoreRequest = + setUpRestoreSnapshotRequest(leaderIndex, numberOfShards, numberOfReplicas, followerIndex, numberOfDocuments); + final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + + final PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + + final ClusterStateResponse leaderIndexClusterState = + leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get(); + final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID(); + + // ensure that a retention lease has been put in place on each shard + assertBusy(() -> { + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); + final List shardStats = getShardStats(stats); + for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { + final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases(); + assertThat(currentRetentionLeases.leases(), hasSize(1)); + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); + final RetentionLease retentionLease = + currentRetentionLeases.leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID))); + } + }); + + final RestoreInfo restoreInfo = future.actionGet(); + + assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards()); + assertEquals(0, restoreInfo.failedShards()); + for (int i = 0; i < numberOfDocuments; ++i) { + assertExpectedDocument(followerIndex, i); + } + + } + + public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception { + final String leaderIndex = "leader"; + final int numberOfShards = randomIntBetween(1, 3); + final int numberOfReplicas = between(0, 1); + final String followerIndex = "follower"; + final int numberOfDocuments = scaledRandomIntBetween(1, 8192); + final RestoreSnapshotRequest restoreRequest = + setUpRestoreSnapshotRequest(leaderIndex, numberOfShards, numberOfReplicas, followerIndex, numberOfDocuments); + final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + + final CountDownLatch latch = new CountDownLatch(1); + + // block the recovery from completing; this ensures the background sync is still running + final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + final ClusterStateResponse leaderClusterState = leaderClient().admin().cluster().prepareState().clear().setNodes(true).get(); + for (final ObjectCursor receiverNode : leaderClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService receiverTransportService = + (MockTransportService) getLeaderCluster().getInstance(TransportService.class, receiverNode.value.getName()); + senderTransportService.addSendBehavior(receiverTransportService, + (connection, requestId, action, request, options) -> { + if (ClearCcrRestoreSessionAction.NAME.equals(action)) { + try { + latch.await(); + } catch (final InterruptedException e) { + fail(e.toString()); + } + } + connection.sendRequest(requestId, action, request, options); + }); + + } + + } + + final PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + + final ClusterStateResponse leaderIndexClusterState = + leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get(); + final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID(); + + try { + // ensure that a retention lease has been put in place on each shard, and grab a copy of them + final List retentionLeases = new ArrayList<>(); + assertBusy(() -> { + retentionLeases.clear(); + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); + final List shardStats = getShardStats(stats); + for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { + final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases(); + assertThat(currentRetentionLeases.leases(), hasSize(1)); + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); + final RetentionLease retentionLease = + currentRetentionLeases.leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID))); + retentionLeases.add(currentRetentionLeases); + } + }); + + // now ensure that the retention leases are being renewed + assertBusy(() -> { + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); + final List shardStats = getShardStats(stats); + for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { + final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases(); + assertThat(currentRetentionLeases.leases(), hasSize(1)); + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); + final RetentionLease retentionLease = + currentRetentionLeases.leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID))); + // we assert that retention leases are being renewed by an increase in the timestamp + assertThat(retentionLease.timestamp(), greaterThan(retentionLeases.get(i).leases().iterator().next().timestamp())); + } + }); + latch.countDown(); + } finally { + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + senderTransportService.clearAllRules(); + } + } + + final RestoreInfo restoreInfo = future.actionGet(); + + assertEquals(restoreInfo.totalShards(), restoreInfo. + + successfulShards()); + + assertEquals(0, restoreInfo.failedShards()); + for (int i = 0; i < numberOfDocuments; i++) { + assertExpectedDocument(followerIndex, i); + } + + } + + public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws Exception { + final String leaderIndex = "leader"; + final int numberOfShards = randomIntBetween(1, 3); + final int numberOfReplicas = between(0, 1); + final String followerIndex = "follower"; + final int numberOfDocuments = scaledRandomIntBetween(1, 8192); + final RestoreSnapshotRequest restoreRequest = + setUpRestoreSnapshotRequest(leaderIndex, numberOfShards, numberOfReplicas, followerIndex, numberOfDocuments); + final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + + final PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + + final RestoreInfo restoreInfo = future.actionGet(); + final long start = System.nanoTime(); + + final ClusterStateResponse leaderIndexClusterState = + leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get(); + final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID(); + + // sample the leases after recovery + final List retentionLeases = new ArrayList<>(); + assertBusy(() -> { + retentionLeases.clear(); + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); + final List shardStats = getShardStats(stats); + for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { + final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases(); + assertThat(currentRetentionLeases.leases(), hasSize(1)); + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); + final RetentionLease retentionLease = + currentRetentionLeases.leases().iterator().next(); + final String expectedRetentionLeaseId = retentionLeaseId( + getFollowerCluster().getClusterName(), + new Index(followerIndex, followerUUID), + getLeaderCluster().getClusterName(), + new Index(leaderIndex, leaderUUID)); + assertThat(retentionLease.id(), equalTo(expectedRetentionLeaseId)); + retentionLeases.add(currentRetentionLeases); + } + }); + + final long end = System.nanoTime(); + Thread.sleep(Math.max(0, randomIntBetween(2, 4) * 200 - TimeUnit.NANOSECONDS.toMillis(end - start))); + + // now ensure that the retention leases are the same + assertBusy(() -> { + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); + final List shardStats = getShardStats(stats); + for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { + final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases(); + assertThat(currentRetentionLeases.leases(), hasSize(1)); + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); + final RetentionLease retentionLease = + currentRetentionLeases.leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID))); + // we assert that retention leases are being renewed by an increase in the timestamp + assertThat(retentionLease.timestamp(), equalTo(retentionLeases.get(i).leases().iterator().next().timestamp())); + } + }); + + assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards()); + assertEquals(0, restoreInfo.failedShards()); + for (int i = 0; i < numberOfDocuments; ++i) { + assertExpectedDocument(followerIndex, i); + } + } + + /** + * Extract the shard stats from an indices stats response, with the stats ordered by shard ID with primaries first. This is to have a + * consistent ordering when comparing two responses. + * + * @param stats the indices stats + * @return the shard stats in sorted order with (shard ID, primary) as the sort key + */ + private List getShardStats(final IndicesStatsResponse stats) { + return Arrays.stream(stats.getShards()) + .sorted((s, t) -> { + if (s.getShardRouting().shardId().id() == t.getShardRouting().shardId().id()) { + return Boolean.compare(s.getShardRouting().primary(), t.getShardRouting().primary()); + } else { + return Integer.compare(s.getShardRouting().shardId().id(), t.getShardRouting().shardId().id()); + } + }) + .collect(Collectors.toList()); + } + + private String getRetentionLeaseId(String followerIndex, String followerUUID, String leaderIndex, String leaderUUID) { + return retentionLeaseId( + getFollowerCluster().getClusterName(), + new Index(followerIndex, followerUUID), + getLeaderCluster().getClusterName(), + new Index(leaderIndex, leaderUUID)); + } + + private void assertExpectedDocument(final String followerIndex, final int value) { + final GetResponse getResponse = followerClient().prepareGet(followerIndex, "doc", Integer.toString(value)).get(); + assertTrue("doc with id [" + value + "] is missing", getResponse.isExists()); + assertTrue((getResponse.getSource().containsKey("f"))); + assertThat(getResponse.getSource().get("f"), equalTo(value)); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 681105efa6a..aef3df9e556 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ccr; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ExceptionsHelper; @@ -15,6 +16,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; @@ -43,6 +45,8 @@ import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.cluster.health.ClusterShardHealth; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; @@ -53,8 +57,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.rest.RestStatus; @@ -87,11 +93,13 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; +import java.util.function.Consumer; import java.util.stream.Collectors; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -982,9 +990,70 @@ public class IndexFollowingIT extends CcrIntegTestCase { } public void testIndexFallBehind() throws Exception { + runFallBehindTest( + () -> { + // we have to remove the retention leases on the leader shards to ensure the follower falls behind + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices("index2").get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index("index2").getIndexUUID(); + final ClusterStateResponse leaderIndexClusterState = + leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices("index1").get(); + final String leaderUUID = leaderIndexClusterState.getState().metaData().index("index1").getIndexUUID(); + + final RoutingTable leaderRoutingTable = leaderClient() + .admin() + .cluster() + .prepareState() + .clear() + .setIndices("index1") + .setRoutingTable(true) + .get() + .getState() + .routingTable(); + + final String retentionLeaseId = retentionLeaseId( + getFollowerCluster().getClusterName(), + new Index("index2", followerUUID), + getLeaderCluster().getClusterName(), + new Index("index1", leaderUUID)); + + for (final ObjectCursor shardRoutingTable + : leaderRoutingTable.index("index1").shards().values()) { + final ShardId shardId = shardRoutingTable.value.shardId(); + leaderClient().execute( + RetentionLeaseActions.Remove.INSTANCE, + new RetentionLeaseActions.RemoveRequest(shardId, retentionLeaseId)) + .get(); + } + }, + exceptions -> assertThat(exceptions.size(), greaterThan(0))); + } + + public void testIndexDoesNotFallBehind() throws Exception { + runFallBehindTest( + () -> {}, + exceptions -> assertThat(exceptions.size(), equalTo(0))); + } + + /** + * Runs a fall behind test. In this test, we construct a situation where a follower is paused. While the follower is paused we index + * more documents that causes soft deletes on the leader, flush them, and run a force merge. This is to set up a situation where the + * operations will not necessarily be there. With retention leases in place, we would actually expect the operations to be there. After + * pausing the follower, the specified callback is executed. This gives a test an opportunity to set up assumptions. For example, a test + * might remove all the retention leases on the leader to set up a situation where the follower will fall behind when it is resumed + * because the operations will no longer be held on the leader. The specified exceptions callback is invoked after resuming the follower + * to give a test an opportunity to assert on the resource not found exceptions (either present or not present). + * + * @param afterPausingFollower the callback to run after pausing the follower + * @param exceptionConsumer the callback to run on a collection of resource not found exceptions after resuming the follower + * @throws Exception if a checked exception is thrown during the test + */ + private void runFallBehindTest( + final CheckedRunnable afterPausingFollower, + final Consumer> exceptionConsumer) throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), - singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureLeaderYellow("index1"); @@ -1008,6 +1077,8 @@ public class IndexFollowingIT extends CcrIntegTestCase { pauseFollow("index2"); + afterPausingFollower.run(); + for (int i = 0; i < numDocs; i++) { final String source = String.format(Locale.ROOT, "{\"f\":%d}", i * 2); leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); @@ -1024,20 +1095,19 @@ public class IndexFollowingIT extends CcrIntegTestCase { assertBusy(() -> { List statuses = getFollowTaskStatuses("index2"); Set exceptions = statuses.stream() - .map(ShardFollowNodeTaskStatus::getFatalException) - .filter(Objects::nonNull) - .map(ExceptionsHelper::unwrapCause) - .filter(e -> e instanceof ResourceNotFoundException) - .map(e -> (ResourceNotFoundException) e) - .filter(e -> e.getMetadataKeys().contains("es.requested_operations_missing")) - .collect(Collectors.toSet()); - assertThat(exceptions.size(), greaterThan(0)); + .map(ShardFollowNodeTaskStatus::getFatalException) + .filter(Objects::nonNull) + .map(ExceptionsHelper::unwrapCause) + .filter(e -> e instanceof ResourceNotFoundException) + .map(e -> (ResourceNotFoundException) e) + .filter(e -> e.getMetadataKeys().contains("es.requested_operations_missing")) + .collect(Collectors.toSet()); + exceptionConsumer.accept(exceptions); }); followerClient().admin().indices().prepareClose("index2").get(); pauseFollow("index2"); - final PutFollowAction.Request followRequest2 = putFollow("index1", "index2"); PutFollowAction.Response response2 = followerClient().execute(PutFollowAction.INSTANCE, followRequest2).get(); assertTrue(response2.isFollowIndexCreated()); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRepositoryRetentionLeaseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRepositoryRetentionLeaseTests.java new file mode 100644 index 00000000000..2e382f73930 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRepositoryRetentionLeaseTests.java @@ -0,0 +1,191 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.repository; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.seqno.RetentionLeaseActions; +import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; +import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ccr.CcrLicenseChecker; +import org.elasticsearch.xpack.ccr.CcrSettings; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; +import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class CcrRepositoryRetentionLeaseTests extends ESTestCase { + + public void testWhenRetentionLeaseAlreadyExistsWeTryToRenewIt() { + final RepositoryMetaData repositoryMetaData = mock(RepositoryMetaData.class); + when(repositoryMetaData.name()).thenReturn(CcrRepository.NAME_PREFIX); + final Set> settings = + Stream.concat( + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), + CcrSettings.getSettings().stream().filter(Setting::hasNodeScope)) + .collect(Collectors.toSet()); + + final CcrRepository repository = new CcrRepository( + repositoryMetaData, + mock(Client.class), + new CcrLicenseChecker(() -> true, () -> true), + Settings.EMPTY, + new CcrSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, settings)), + mock(ThreadPool.class)); + + final ShardId followerShardId = new ShardId(new Index("follower-index-name", "follower-index-uuid"), 0); + final ShardId leaderShardId = new ShardId(new Index("leader-index-name", "leader-index-uuid"), 0); + + final String retentionLeaseId = + retentionLeaseId("local-cluster", followerShardId.getIndex(), "remote-cluster", leaderShardId.getIndex()); + + // simulate that the the retention lease already exists on the leader, and verify that we attempt to renew it + final Client remoteClient = mock(Client.class); + final ArgumentCaptor addRequestCaptor = + ArgumentCaptor.forClass(RetentionLeaseActions.AddRequest.class); + doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") final ActionListener listener = + (ActionListener) invocationOnMock.getArguments()[2]; + listener.onFailure(new RetentionLeaseAlreadyExistsException(retentionLeaseId)); + return null; + }) + .when(remoteClient) + .execute(same(RetentionLeaseActions.Add.INSTANCE), addRequestCaptor.capture(), any()); + final ArgumentCaptor renewRequestCaptor = + ArgumentCaptor.forClass(RetentionLeaseActions.RenewRequest.class); + doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") final ActionListener listener = + (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(new RetentionLeaseActions.Response()); + return null; + }) + .when(remoteClient) + .execute(same(RetentionLeaseActions.Renew.INSTANCE), renewRequestCaptor.capture(), any()); + + repository.acquireRetentionLeaseOnLeader(followerShardId, retentionLeaseId, leaderShardId, remoteClient); + + verify(remoteClient).execute(same(RetentionLeaseActions.Add.INSTANCE), any(RetentionLeaseActions.AddRequest.class), any()); + assertThat(addRequestCaptor.getValue().getShardId(), equalTo(leaderShardId)); + assertThat(addRequestCaptor.getValue().getId(), equalTo(retentionLeaseId)); + assertThat(addRequestCaptor.getValue().getRetainingSequenceNumber(), equalTo(RETAIN_ALL)); + assertThat(addRequestCaptor.getValue().getSource(), equalTo("ccr")); + + verify(remoteClient).execute(same(RetentionLeaseActions.Renew.INSTANCE), any(RetentionLeaseActions.RenewRequest.class), any()); + assertThat(renewRequestCaptor.getValue().getShardId(), equalTo(leaderShardId)); + assertThat(renewRequestCaptor.getValue().getId(), equalTo(retentionLeaseId)); + assertThat(renewRequestCaptor.getValue().getRetainingSequenceNumber(), equalTo(RETAIN_ALL)); + assertThat(renewRequestCaptor.getValue().getSource(), equalTo("ccr")); + + verifyNoMoreInteractions(remoteClient); + } + + public void testWhenRetentionLeaseExpiresBeforeWeCanRenewIt() { + final RepositoryMetaData repositoryMetaData = mock(RepositoryMetaData.class); + when(repositoryMetaData.name()).thenReturn(CcrRepository.NAME_PREFIX); + final Set> settings = + Stream.concat( + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), + CcrSettings.getSettings().stream().filter(Setting::hasNodeScope)) + .collect(Collectors.toSet()); + + final CcrRepository repository = new CcrRepository( + repositoryMetaData, + mock(Client.class), + new CcrLicenseChecker(() -> true, () -> true), + Settings.EMPTY, + new CcrSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, settings)), + mock(ThreadPool.class)); + + final ShardId followerShardId = new ShardId(new Index("follower-index-name", "follower-index-uuid"), 0); + final ShardId leaderShardId = new ShardId(new Index("leader-index-name", "leader-index-uuid"), 0); + + final String retentionLeaseId = + retentionLeaseId("local-cluster", followerShardId.getIndex(), "remote-cluster", leaderShardId.getIndex()); + + // simulate that the the retention lease already exists on the leader, expires before we renew, and verify that we attempt to add it + final Client remoteClient = mock(Client.class); + final ArgumentCaptor addRequestCaptor = + ArgumentCaptor.forClass(RetentionLeaseActions.AddRequest.class); + final PlainActionFuture response = new PlainActionFuture<>(); + response.onResponse(new RetentionLeaseActions.Response()); + doAnswer( + new Answer() { + + final AtomicBoolean firstInvocation = new AtomicBoolean(true); + + @Override + public Void answer(final InvocationOnMock invocationOnMock) { + @SuppressWarnings("unchecked") final ActionListener listener = + (ActionListener) invocationOnMock.getArguments()[2]; + if (firstInvocation.compareAndSet(true, false)) { + listener.onFailure(new RetentionLeaseAlreadyExistsException(retentionLeaseId)); + } else { + listener.onResponse(new RetentionLeaseActions.Response()); + } + return null; + } + + }) + .when(remoteClient).execute(same(RetentionLeaseActions.Add.INSTANCE), addRequestCaptor.capture(), any()); + final ArgumentCaptor renewRequestCaptor = + ArgumentCaptor.forClass(RetentionLeaseActions.RenewRequest.class); + doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") final ActionListener listener = + (ActionListener) invocationOnMock.getArguments()[2]; + listener.onFailure(new RetentionLeaseNotFoundException(retentionLeaseId)); + return null; + } + ).when(remoteClient) + .execute(same(RetentionLeaseActions.Renew.INSTANCE), renewRequestCaptor.capture(), any()); + + repository.acquireRetentionLeaseOnLeader(followerShardId, retentionLeaseId, leaderShardId, remoteClient); + + verify(remoteClient, times(2)) + .execute(same(RetentionLeaseActions.Add.INSTANCE), any(RetentionLeaseActions.AddRequest.class), any()); + assertThat(addRequestCaptor.getValue().getShardId(), equalTo(leaderShardId)); + assertThat(addRequestCaptor.getValue().getId(), equalTo(retentionLeaseId)); + assertThat(addRequestCaptor.getValue().getRetainingSequenceNumber(), equalTo(RETAIN_ALL)); + assertThat(addRequestCaptor.getValue().getSource(), equalTo("ccr")); + + verify(remoteClient).execute(same(RetentionLeaseActions.Renew.INSTANCE), any(RetentionLeaseActions.RenewRequest.class), any()); + assertThat(renewRequestCaptor.getValue().getShardId(), equalTo(leaderShardId)); + assertThat(renewRequestCaptor.getValue().getId(), equalTo(retentionLeaseId)); + assertThat(renewRequestCaptor.getValue().getRetainingSequenceNumber(), equalTo(RETAIN_ALL)); + assertThat(renewRequestCaptor.getValue().getSource(), equalTo("ccr")); + + verifyNoMoreInteractions(remoteClient); + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java index ec3305a963c..523a8101749 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.security.authz.privilege; +import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; import org.elasticsearch.transport.TransportActionProxy; @@ -29,6 +30,8 @@ public final class SystemPrivilege extends Privilege { "indices:admin/seq_no/global_checkpoint_sync*", // needed for global checkpoint syncs RetentionLeaseSyncAction.ACTION_NAME + "*", // needed for retention lease syncs RetentionLeaseBackgroundSyncAction.ACTION_NAME + "*", // needed for background retention lease syncs + RetentionLeaseActions.Add.ACTION_NAME + "*", // needed for CCR to add retention leases + RetentionLeaseActions.Renew.ACTION_NAME + "*", // needed for CCR to renew retention leases "indices:admin/settings/update" // needed for DiskThresholdMonitor.markIndicesReadOnly ); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java index 906b00ccab0..46db9e83f77 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java @@ -133,6 +133,10 @@ public class PrivilegeTests extends ESTestCase { assertThat(predicate.test("indices:admin/seq_no/retention_lease_background_sync"), is(true)); assertThat(predicate.test("indices:admin/seq_no/retention_lease_background_sync[p]"), is(true)); assertThat(predicate.test("indices:admin/seq_no/retention_lease_background_sync[r]"), is(true)); + assertThat(predicate.test("indices:admin/seq_no/add_retention_lease"), is(true)); + assertThat(predicate.test("indices:admin/seq_no/add_retention_lease[s]"), is(true)); + assertThat(predicate.test("indices:admin/seq_no/renew_retention_lease"), is(true)); + assertThat(predicate.test("indices:admin/seq_no/renew_retention_lease[s]"), is(true)); assertThat(predicate.test("indices:admin/settings/update"), is(true)); assertThat(predicate.test("indices:admin/settings/foo"), is(false)); } diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java index b3c93acb97b..8b4c21ee086 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java @@ -278,6 +278,7 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase { } } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37165") public void testUnfollowInjectedBeforeShrink() throws Exception { final String indexName = "shrink-test"; final String shrunkenIndexName = "shrink-" + indexName; diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java index 7c4cd564e99..bde5949d378 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java @@ -261,6 +261,8 @@ public class AuthorizationServiceTests extends ESTestCase { "indices:admin/seq_no/global_checkpoint_sync", "indices:admin/seq_no/retention_lease_sync", "indices:admin/seq_no/retention_lease_background_sync", + "indices:admin/seq_no/add_retention_lease", + "indices:admin/seq_no/renew_retention_lease", "indices:admin/settings/update" }; for (String action : actions) { authorize(authentication, action, request);