Integrate retention leases to recovery from remote (#38829)

This commit is the first step in integrating shard history retention
leases with CCR. In this commit we integrate shard history retention
leases with recovery from remote. Before we start transferring files, we
take out a retention lease on the primary. Then during the file copy
phase, we repeatedly renew the retention lease. Finally, when recovery
from remote is complete, we disable the background renewing of the
retention lease.
This commit is contained in:
Jason Tedor 2019-02-16 15:28:04 -05:00
parent b1c1daa63f
commit a5ce1e0bec
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
14 changed files with 991 additions and 98 deletions

View File

@ -115,7 +115,7 @@ public class RetentionLeaseActions {
}
@Override
protected Response shardOperation(final T request, final ShardId shardId) throws IOException {
protected Response shardOperation(final T request, final ShardId shardId) {
throw new UnsupportedOperationException();
}
@ -136,10 +136,10 @@ public class RetentionLeaseActions {
public static class Add extends Action<Response> {
public static final Add INSTANCE = new Add();
public static final String NAME = "indices:admin/seq_no/add_retention_lease";
public static final String ACTION_NAME = "indices:admin/seq_no/add_retention_lease";
private Add() {
super(NAME);
super(ACTION_NAME);
}
public static class TransportAction extends TransportRetentionLeaseAction<AddRequest> {
@ -153,7 +153,7 @@ public class RetentionLeaseActions {
final IndexNameExpressionResolver indexNameExpressionResolver,
final IndicesService indicesService) {
super(
NAME,
ACTION_NAME,
threadPool,
clusterService,
transportService,
@ -186,10 +186,10 @@ public class RetentionLeaseActions {
public static class Renew extends Action<Response> {
public static final Renew INSTANCE = new Renew();
public static final String NAME = "indices:admin/seq_no/renew_retention_lease";
public static final String ACTION_NAME = "indices:admin/seq_no/renew_retention_lease";
private Renew() {
super(NAME);
super(ACTION_NAME);
}
public static class TransportAction extends TransportRetentionLeaseAction<RenewRequest> {
@ -203,7 +203,7 @@ public class RetentionLeaseActions {
final IndexNameExpressionResolver indexNameExpressionResolver,
final IndicesService indicesService) {
super(
NAME,
ACTION_NAME,
threadPool,
clusterService,
transportService,
@ -232,10 +232,10 @@ public class RetentionLeaseActions {
public static class Remove extends Action<Response> {
public static final Remove INSTANCE = new Remove();
public static final String NAME = "indices:admin/seq_no/remove_retention_lease";
public static final String ACTION_NAME = "indices:admin/seq_no/remove_retention_lease";
private Remove() {
super(NAME);
super(ACTION_NAME);
}
public static class TransportAction extends TransportRetentionLeaseAction<RemoveRequest> {
@ -249,7 +249,7 @@ public class RetentionLeaseActions {
final IndexNameExpressionResolver indexNameExpressionResolver,
final IndicesService indicesService) {
super(
NAME,
ACTION_NAME,
threadPool,
clusterService,
transportService,

View File

@ -27,7 +27,7 @@ import java.util.Objects;
public class RetentionLeaseAlreadyExistsException extends ResourceAlreadyExistsException {
RetentionLeaseAlreadyExistsException(final String id) {
public RetentionLeaseAlreadyExistsException(final String id) {
super("retention lease with ID [" + Objects.requireNonNull(id) + "] already exists");
}

View File

@ -27,7 +27,7 @@ import java.util.Objects;
public class RetentionLeaseNotFoundException extends ResourceNotFoundException {
RetentionLeaseNotFoundException(final String id) {
public RetentionLeaseNotFoundException(final String id) {
super("retention lease with ID [" + Objects.requireNonNull(id) + "] not found");
}

View File

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr;
import org.elasticsearch.index.Index;
import java.util.Locale;
public class CcrRetentionLeases {
/**
* The retention lease ID used by followers.
*
* @param localClusterName the local cluster name
* @param followerIndex the follower index
* @param remoteClusterAlias the remote cluster alias
* @param leaderIndex the leader index
* @return the retention lease ID
*/
public static String retentionLeaseId(
final String localClusterName,
final Index followerIndex,
final String remoteClusterAlias,
final Index leaderIndex) {
return String.format(
Locale.ROOT,
"%s/%s/%s-following-%s/%s/%s",
localClusterName,
followerIndex.getName(),
followerIndex.getUUID(),
remoteClusterAlias,
leaderIndex.getName(),
leaderIndex.getUUID());
}
}

View File

@ -8,8 +8,12 @@ package org.elasticsearch.xpack.ccr.repository;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
@ -19,6 +23,7 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.ListenerTimeouts;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -30,13 +35,18 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRecoveryException;
import org.elasticsearch.index.shard.ShardId;
@ -56,6 +66,7 @@ import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
@ -76,12 +87,16 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId;
/**
@ -90,6 +105,8 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
*/
public class CcrRepository extends AbstractLifecycleComponent implements Repository {
private static final Logger logger = LogManager.getLogger(CcrRepository.class);
public static final String LATEST = "_latest_";
public static final String TYPE = "_ccr_";
public static final String NAME_PREFIX = "_ccr_";
@ -98,6 +115,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
private final RepositoryMetaData metadata;
private final CcrSettings ccrSettings;
private final String localClusterName;
private final String remoteClusterAlias;
private final Client client;
private final CcrLicenseChecker ccrLicenseChecker;
@ -109,6 +127,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
CcrSettings ccrSettings, ThreadPool threadPool) {
this.metadata = metadata;
this.ccrSettings = ccrSettings;
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value();
assert metadata.name().startsWith(NAME_PREFIX) : "CcrRepository metadata.name() must start with: " + NAME_PREFIX;
this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1];
this.ccrLicenseChecker = ccrLicenseChecker;
@ -136,10 +155,14 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
return metadata;
}
private Client getRemoteClusterClient() {
return client.getRemoteClusterClient(remoteClusterAlias);
}
@Override
public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
Client remoteClient = getRemoteClusterClient();
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true).setNodes(true)
.get(ccrSettings.getRecoveryActionTimeout());
ImmutableOpenMap<String, IndexMetaData> indicesMap = response.getState().metaData().indices();
@ -152,7 +175,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
@Override
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
Client remoteClient = getRemoteClusterClient();
// We set a single dummy index name to avoid fetching all the index data
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name");
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
@ -164,7 +187,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId index) throws IOException {
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
String leaderIndex = index.getName();
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
Client remoteClient = getRemoteClusterClient();
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex);
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest)
@ -203,7 +226,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
@Override
public RepositoryData getRepositoryData() {
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
Client remoteClient = getRemoteClusterClient();
ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetaData(true)
.get(ccrSettings.getRecoveryActionTimeout());
MetaData remoteMetaData = response.getState().getMetaData();
@ -280,33 +303,167 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId,
RecoveryState recoveryState) {
// TODO: Add timeouts to network calls / the restore process.
final Store store = indexShard.store();
store.incRef();
try {
store.createEmpty(indexShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion);
} catch (EngineException | IOException e) {
throw new IndexShardRecoveryException(shardId, "failed to create empty store", e);
} finally {
store.decRef();
createEmptyStore(indexShard, shardId);
final Map<String, String> ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
final String leaderIndexName = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
final String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
final Index leaderIndex = new Index(leaderIndexName, leaderUUID);
final ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId());
final Client remoteClient = getRemoteClusterClient();
final String retentionLeaseId =
retentionLeaseId(localClusterName, indexShard.shardId().getIndex(), remoteClusterAlias, leaderIndex);
acquireRetentionLeaseOnLeader(shardId, retentionLeaseId, leaderShardId, remoteClient);
// schedule renewals to run during the restore
final Scheduler.Cancellable renewable = threadPool.scheduleWithFixedDelay(
() -> {
logger.trace("{} background renewal of retention lease [{}] during restore", shardId, retentionLeaseId);
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the renewal is authorized
threadContext.markAsSystemContext();
asyncRenewRetentionLease(
leaderShardId,
retentionLeaseId,
remoteClient,
ActionListener.wrap(
r -> {},
e -> {
assert e instanceof ElasticsearchSecurityException == false : e;
logger.warn(new ParameterizedMessage(
"{} background renewal of retention lease [{}] failed during restore",
shardId,
retentionLeaseId),
e);
}));
}
},
RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(indexShard.indexSettings().getSettings()),
Ccr.CCR_THREAD_POOL_NAME);
Map<String, String> ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID);
ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId());
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
// TODO: There should be some local timeout. And if the remote cluster returns an unknown session
// response, we should be able to retry by creating a new session.
String name = metadata.name();
try (RestoreSession restoreSession = openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) {
try (RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, indexShard, recoveryState)) {
restoreSession.restoreFiles();
updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, indexShard.routingEntry().index());
} catch (Exception e) {
throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
} finally {
logger.trace("{} canceling background renewal of retention lease [{}] at the end of restore", shardId, retentionLeaseId);
renewable.cancel();
}
}
private void createEmptyStore(final IndexShard indexShard, final ShardId shardId) {
final Store store = indexShard.store();
store.incRef();
try {
store.createEmpty(indexShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion);
} catch (final EngineException | IOException e) {
throw new IndexShardRecoveryException(shardId, "failed to create empty store", e);
} finally {
store.decRef();
}
}
void acquireRetentionLeaseOnLeader(
final ShardId shardId,
final String retentionLeaseId,
final ShardId leaderShardId,
final Client remoteClient) {
logger.trace(
() -> new ParameterizedMessage("{} requesting leader to add retention lease [{}]", shardId, retentionLeaseId));
final Optional<RetentionLeaseAlreadyExistsException> maybeAddAlready =
syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient);
maybeAddAlready.ifPresent(addAlready -> {
logger.trace(() -> new ParameterizedMessage(
"{} retention lease [{}] already exists, requesting a renewal",
shardId,
retentionLeaseId),
addAlready);
final Optional<RetentionLeaseNotFoundException> maybeRenewNotFound =
syncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient);
maybeRenewNotFound.ifPresent(renewNotFound -> {
logger.trace(() -> new ParameterizedMessage(
"{} retention lease [{}] not found while attempting to renew, requesting a final add",
shardId,
retentionLeaseId),
renewNotFound);
final Optional<RetentionLeaseAlreadyExistsException> maybeFallbackAddAlready =
syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient);
maybeFallbackAddAlready.ifPresent(fallbackAddAlready -> {
/*
* At this point we tried to add the lease and the retention lease already existed. By the time we tried to renew the
* lease, it expired or was removed. We tried to add the lease again and it already exists? Bail.
*/
assert false : fallbackAddAlready;
throw fallbackAddAlready;
});
});
});
}
private Optional<RetentionLeaseAlreadyExistsException> syncAddRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient) {
try {
final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
asyncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
response.actionGet(ccrSettings.getRecoveryActionTimeout());
return Optional.empty();
} catch (final RetentionLeaseAlreadyExistsException e) {
return Optional.of(e);
}
}
private void asyncAddRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
final RetentionLeaseActions.AddRequest request =
new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener);
}
private Optional<RetentionLeaseNotFoundException> syncRenewRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient) {
try {
final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
asyncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
response.actionGet(ccrSettings.getRecoveryActionTimeout());
return Optional.empty();
} catch (final RetentionLeaseNotFoundException e) {
return Optional.of(e);
}
}
private void asyncRenewRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
final RetentionLeaseActions.RenewRequest request =
new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener);
}
// this setting is intentionally not registered, it is only used in tests
public static final Setting<TimeValue> RETENTION_LEASE_RENEW_INTERVAL_SETTING =
Setting.timeSetting(
"index.ccr.retention_lease.renew_interval",
new TimeValue(5, TimeUnit.MINUTES),
new TimeValue(0, TimeUnit.MILLISECONDS),
Setting.Property.Dynamic,
Setting.Property.IndexScope);
@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
@ -330,7 +487,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
}
}
private RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
RecoveryState recoveryState) {
String sessionUUID = UUIDs.randomBase64UUID();
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,

View File

@ -7,6 +7,7 @@
package org.elasticsearch.xpack;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
@ -22,8 +23,11 @@ import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -35,6 +39,7 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -59,6 +64,9 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
@ -101,10 +109,12 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING;
import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING;
import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.empty;
@ -116,6 +126,10 @@ public abstract class CcrIntegTestCase extends ESTestCase {
private static ClusterGroup clusterGroup;
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.emptyList();
}
@Before
public final void startClusters() throws Exception {
if (clusterGroup != null && reuseClusters()) {
@ -226,7 +240,10 @@ public abstract class CcrIntegTestCase extends ESTestCase {
@Override
public Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCcr.class, CommonAnalysisPlugin.class);
return Stream.concat(
Stream.of(LocalStateCcr.class, CommonAnalysisPlugin.class),
CcrIntegTestCase.this.nodePlugins().stream())
.collect(Collectors.toList());
}
@Override
@ -642,6 +659,61 @@ public abstract class CcrIntegTestCase extends ESTestCase {
return lastKnownCount.get();
}
protected ActionListener<RestoreService.RestoreCompletionResponse> waitForRestore(
final ClusterService clusterService,
final ActionListener<RestoreInfo> listener) {
return new ActionListener<RestoreService.RestoreCompletionResponse>() {
@Override
public void onResponse(RestoreService.RestoreCompletionResponse restoreCompletionResponse) {
if (restoreCompletionResponse.getRestoreInfo() == null) {
final Snapshot snapshot = restoreCompletionResponse.getSnapshot();
final String uuid = restoreCompletionResponse.getUuid();
final ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
/*
* When there is a master failure after a restore has been started, this listener might not be registered
* on the current master and as such it might miss some intermediary cluster states due to batching.
* Clean up the listener in that case and acknowledge completion of restore operation to client.
*/
clusterService.removeListener(this);
listener.onResponse(null);
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(),
prevEntry.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards));
logger.debug("restore of [{}] completed", snapshot);
listener.onResponse(ri);
} else {
// restore not completed yet, wait for next cluster state update
}
}
};
clusterService.addListener(clusterStateListener);
} else {
listener.onResponse(restoreCompletionResponse.getRestoreInfo());
}
}
@Override
public void onFailure(Exception t) {
listener.onFailure(t);
}
};
}
static void removeCCRRelatedMetadataFromClusterState(ClusterService clusterService) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("remove-ccr-related-metadata", new ClusterStateUpdateTask() {

View File

@ -16,26 +16,20 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportService;
@ -55,7 +49,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@ -453,51 +446,4 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
assertThat(getResponse.getSource().get("f"), equalTo(value));
}
private ActionListener<RestoreService.RestoreCompletionResponse> waitForRestore(ClusterService clusterService,
ActionListener<RestoreInfo> listener) {
return new ActionListener<RestoreService.RestoreCompletionResponse>() {
@Override
public void onResponse(RestoreService.RestoreCompletionResponse restoreCompletionResponse) {
if (restoreCompletionResponse.getRestoreInfo() == null) {
final Snapshot snapshot = restoreCompletionResponse.getSnapshot();
final String uuid = restoreCompletionResponse.getUuid();
ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
// Clean up listener in that case and acknowledge completion of restore operation to client.
clusterService.removeListener(this);
listener.onResponse(null);
} else if (newEntry == null) {
clusterService.removeListener(this);
ImmutableOpenMap<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
RestoreInfo ri = new RestoreInfo(prevEntry.snapshot().getSnapshotId().getName(),
prevEntry.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards));
logger.debug("restore of [{}] completed", snapshot);
listener.onResponse(ri);
} else {
// restore not completed yet, wait for next cluster state update
}
}
};
clusterService.addListener(clusterStateListener);
} else {
listener.onResponse(restoreCompletionResponse.getRestoreInfo());
}
}
@Override
public void onFailure(Exception t) {
listener.onFailure(t);
}
};
}
}

View File

@ -0,0 +1,407 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
public class CcrRetentionLeaseIT extends CcrIntegTestCase {
public static final class RetentionLeaseRenewIntervalSettingPlugin extends Plugin {
@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING);
}
}
public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin {
@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING);
}
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(
super.nodePlugins().stream(),
Stream.of(RetentionLeaseRenewIntervalSettingPlugin.class, RetentionLeaseSyncIntervalSettingPlugin.class))
.collect(Collectors.toList());
}
private final IndicesOptions indicesOptions = IndicesOptions.strictSingleIndexNoExpandForbidClosed();
private RestoreSnapshotRequest setUpRestoreSnapshotRequest(
final String leaderIndex,
final int numberOfShards,
final int numberOfReplicas,
final String followerIndex,
final int numberOfDocuments) throws IOException {
final ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
final String chunkSize = new ByteSizeValue(randomFrom(4, 128, 1024), ByteSizeUnit.KB).getStringRep();
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), chunkSize));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
final String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
final Map<String, String> additionalSettings = new HashMap<>();
additionalSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true");
additionalSettings.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200).getStringRep());
final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalSettings);
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen(leaderIndex);
logger.info("indexing [{}] docs", numberOfDocuments);
for (int i = 0; i < numberOfDocuments; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex(leaderIndex, "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
if (rarely()) {
leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get();
}
}
leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get();
final Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
.put(CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200))
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
return new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST)
.indexSettings(settingsBuilder)
.indices(leaderIndex)
.indicesOptions(indicesOptions)
.renamePattern("^(.*)$")
.renameReplacement(followerIndex)
.masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS));
}
public void testRetentionLeaseIsTakenAtTheStartOfRecovery() throws Exception {
final String leaderIndex = "leader";
final int numberOfShards = randomIntBetween(1, 3);
final int numberOfReplicas = between(0, 1);
final String followerIndex = "follower";
final int numberOfDocuments = scaledRandomIntBetween(1, 8192);
final RestoreSnapshotRequest restoreRequest =
setUpRestoreSnapshotRequest(leaderIndex, numberOfShards, numberOfReplicas, followerIndex, numberOfDocuments);
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
final PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
final ClusterStateResponse leaderIndexClusterState =
leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get();
final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID();
// ensure that a retention lease has been put in place on each shard
assertBusy(() -> {
final IndicesStatsResponse stats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardStats = getShardStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID();
final RetentionLease retentionLease =
currentRetentionLeases.leases().iterator().next();
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID)));
}
});
final RestoreInfo restoreInfo = future.actionGet();
assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards());
assertEquals(0, restoreInfo.failedShards());
for (int i = 0; i < numberOfDocuments; ++i) {
assertExpectedDocument(followerIndex, i);
}
}
public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception {
final String leaderIndex = "leader";
final int numberOfShards = randomIntBetween(1, 3);
final int numberOfReplicas = between(0, 1);
final String followerIndex = "follower";
final int numberOfDocuments = scaledRandomIntBetween(1, 8192);
final RestoreSnapshotRequest restoreRequest =
setUpRestoreSnapshotRequest(leaderIndex, numberOfShards, numberOfReplicas, followerIndex, numberOfDocuments);
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
final CountDownLatch latch = new CountDownLatch(1);
// block the recovery from completing; this ensures the background sync is still running
final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
final ClusterStateResponse leaderClusterState = leaderClient().admin().cluster().prepareState().clear().setNodes(true).get();
for (final ObjectCursor<DiscoveryNode> receiverNode : leaderClusterState.getState().nodes().getDataNodes().values()) {
final MockTransportService receiverTransportService =
(MockTransportService) getLeaderCluster().getInstance(TransportService.class, receiverNode.value.getName());
senderTransportService.addSendBehavior(receiverTransportService,
(connection, requestId, action, request, options) -> {
if (ClearCcrRestoreSessionAction.NAME.equals(action)) {
try {
latch.await();
} catch (final InterruptedException e) {
fail(e.toString());
}
}
connection.sendRequest(requestId, action, request, options);
});
}
}
final PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
final ClusterStateResponse leaderIndexClusterState =
leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get();
final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID();
try {
// ensure that a retention lease has been put in place on each shard, and grab a copy of them
final List<RetentionLeases> retentionLeases = new ArrayList<>();
assertBusy(() -> {
retentionLeases.clear();
final IndicesStatsResponse stats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardStats = getShardStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID();
final RetentionLease retentionLease =
currentRetentionLeases.leases().iterator().next();
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID)));
retentionLeases.add(currentRetentionLeases);
}
});
// now ensure that the retention leases are being renewed
assertBusy(() -> {
final IndicesStatsResponse stats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardStats = getShardStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID();
final RetentionLease retentionLease =
currentRetentionLeases.leases().iterator().next();
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID)));
// we assert that retention leases are being renewed by an increase in the timestamp
assertThat(retentionLease.timestamp(), greaterThan(retentionLeases.get(i).leases().iterator().next().timestamp()));
}
});
latch.countDown();
} finally {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
senderTransportService.clearAllRules();
}
}
final RestoreInfo restoreInfo = future.actionGet();
assertEquals(restoreInfo.totalShards(), restoreInfo.
successfulShards());
assertEquals(0, restoreInfo.failedShards());
for (int i = 0; i < numberOfDocuments; i++) {
assertExpectedDocument(followerIndex, i);
}
}
public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws Exception {
final String leaderIndex = "leader";
final int numberOfShards = randomIntBetween(1, 3);
final int numberOfReplicas = between(0, 1);
final String followerIndex = "follower";
final int numberOfDocuments = scaledRandomIntBetween(1, 8192);
final RestoreSnapshotRequest restoreRequest =
setUpRestoreSnapshotRequest(leaderIndex, numberOfShards, numberOfReplicas, followerIndex, numberOfDocuments);
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
final PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
final RestoreInfo restoreInfo = future.actionGet();
final long start = System.nanoTime();
final ClusterStateResponse leaderIndexClusterState =
leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get();
final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID();
// sample the leases after recovery
final List<RetentionLeases> retentionLeases = new ArrayList<>();
assertBusy(() -> {
retentionLeases.clear();
final IndicesStatsResponse stats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardStats = getShardStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID();
final RetentionLease retentionLease =
currentRetentionLeases.leases().iterator().next();
final String expectedRetentionLeaseId = retentionLeaseId(
getFollowerCluster().getClusterName(),
new Index(followerIndex, followerUUID),
getLeaderCluster().getClusterName(),
new Index(leaderIndex, leaderUUID));
assertThat(retentionLease.id(), equalTo(expectedRetentionLeaseId));
retentionLeases.add(currentRetentionLeases);
}
});
final long end = System.nanoTime();
Thread.sleep(Math.max(0, randomIntBetween(2, 4) * 200 - TimeUnit.NANOSECONDS.toMillis(end - start)));
// now ensure that the retention leases are the same
assertBusy(() -> {
final IndicesStatsResponse stats =
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas)));
final List<ShardStats> shardStats = getShardStats(stats);
for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) {
final RetentionLeases currentRetentionLeases = shardStats.get(i).getRetentionLeaseStats().retentionLeases();
assertThat(currentRetentionLeases.leases(), hasSize(1));
final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID();
final RetentionLease retentionLease =
currentRetentionLeases.leases().iterator().next();
assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID)));
// we assert that retention leases are being renewed by an increase in the timestamp
assertThat(retentionLease.timestamp(), equalTo(retentionLeases.get(i).leases().iterator().next().timestamp()));
}
});
assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards());
assertEquals(0, restoreInfo.failedShards());
for (int i = 0; i < numberOfDocuments; ++i) {
assertExpectedDocument(followerIndex, i);
}
}
/**
* Extract the shard stats from an indices stats response, with the stats ordered by shard ID with primaries first. This is to have a
* consistent ordering when comparing two responses.
*
* @param stats the indices stats
* @return the shard stats in sorted order with (shard ID, primary) as the sort key
*/
private List<ShardStats> getShardStats(final IndicesStatsResponse stats) {
return Arrays.stream(stats.getShards())
.sorted((s, t) -> {
if (s.getShardRouting().shardId().id() == t.getShardRouting().shardId().id()) {
return Boolean.compare(s.getShardRouting().primary(), t.getShardRouting().primary());
} else {
return Integer.compare(s.getShardRouting().shardId().id(), t.getShardRouting().shardId().id());
}
})
.collect(Collectors.toList());
}
private String getRetentionLeaseId(String followerIndex, String followerUUID, String leaderIndex, String leaderUUID) {
return retentionLeaseId(
getFollowerCluster().getClusterName(),
new Index(followerIndex, followerUUID),
getLeaderCluster().getClusterName(),
new Index(leaderIndex, leaderUUID));
}
private void assertExpectedDocument(final String followerIndex, final int value) {
final GetResponse getResponse = followerClient().prepareGet(followerIndex, "doc", Integer.toString(value)).get();
assertTrue("doc with id [" + value + "] is missing", getResponse.isExists());
assertTrue((getResponse.getSource().containsKey("f")));
assertThat(getResponse.getSource().get("f"), equalTo(value));
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ccr;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
@ -15,6 +16,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@ -43,6 +45,8 @@ import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.health.ClusterShardHealth;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesReference;
@ -53,8 +57,10 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.rest.RestStatus;
@ -87,11 +93,13 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@ -982,6 +990,67 @@ public class IndexFollowingIT extends CcrIntegTestCase {
}
public void testIndexFallBehind() throws Exception {
runFallBehindTest(
() -> {
// we have to remove the retention leases on the leader shards to ensure the follower falls behind
final ClusterStateResponse followerIndexClusterState =
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices("index2").get();
final String followerUUID = followerIndexClusterState.getState().metaData().index("index2").getIndexUUID();
final ClusterStateResponse leaderIndexClusterState =
leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices("index1").get();
final String leaderUUID = leaderIndexClusterState.getState().metaData().index("index1").getIndexUUID();
final RoutingTable leaderRoutingTable = leaderClient()
.admin()
.cluster()
.prepareState()
.clear()
.setIndices("index1")
.setRoutingTable(true)
.get()
.getState()
.routingTable();
final String retentionLeaseId = retentionLeaseId(
getFollowerCluster().getClusterName(),
new Index("index2", followerUUID),
getLeaderCluster().getClusterName(),
new Index("index1", leaderUUID));
for (final ObjectCursor<IndexShardRoutingTable> shardRoutingTable
: leaderRoutingTable.index("index1").shards().values()) {
final ShardId shardId = shardRoutingTable.value.shardId();
leaderClient().execute(
RetentionLeaseActions.Remove.INSTANCE,
new RetentionLeaseActions.RemoveRequest(shardId, retentionLeaseId))
.get();
}
},
exceptions -> assertThat(exceptions.size(), greaterThan(0)));
}
public void testIndexDoesNotFallBehind() throws Exception {
runFallBehindTest(
() -> {},
exceptions -> assertThat(exceptions.size(), equalTo(0)));
}
/**
* Runs a fall behind test. In this test, we construct a situation where a follower is paused. While the follower is paused we index
* more documents that causes soft deletes on the leader, flush them, and run a force merge. This is to set up a situation where the
* operations will not necessarily be there. With retention leases in place, we would actually expect the operations to be there. After
* pausing the follower, the specified callback is executed. This gives a test an opportunity to set up assumptions. For example, a test
* might remove all the retention leases on the leader to set up a situation where the follower will fall behind when it is resumed
* because the operations will no longer be held on the leader. The specified exceptions callback is invoked after resuming the follower
* to give a test an opportunity to assert on the resource not found exceptions (either present or not present).
*
* @param afterPausingFollower the callback to run after pausing the follower
* @param exceptionConsumer the callback to run on a collection of resource not found exceptions after resuming the follower
* @throws Exception if a checked exception is thrown during the test
*/
private void runFallBehindTest(
final CheckedRunnable<Exception> afterPausingFollower,
final Consumer<Collection<ResourceNotFoundException>> exceptionConsumer) throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
@ -1008,6 +1077,8 @@ public class IndexFollowingIT extends CcrIntegTestCase {
pauseFollow("index2");
afterPausingFollower.run();
for (int i = 0; i < numDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i * 2);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
@ -1031,13 +1102,12 @@ public class IndexFollowingIT extends CcrIntegTestCase {
.map(e -> (ResourceNotFoundException) e)
.filter(e -> e.getMetadataKeys().contains("es.requested_operations_missing"))
.collect(Collectors.toSet());
assertThat(exceptions.size(), greaterThan(0));
exceptionConsumer.accept(exceptions);
});
followerClient().admin().indices().prepareClose("index2").get();
pauseFollow("index2");
final PutFollowAction.Request followRequest2 = putFollow("index1", "index2");
PutFollowAction.Response response2 = followerClient().execute(PutFollowAction.INSTANCE, followRequest2).get();
assertTrue(response2.isFollowIndexCreated());

View File

@ -0,0 +1,191 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.repository;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class CcrRepositoryRetentionLeaseTests extends ESTestCase {
public void testWhenRetentionLeaseAlreadyExistsWeTryToRenewIt() {
final RepositoryMetaData repositoryMetaData = mock(RepositoryMetaData.class);
when(repositoryMetaData.name()).thenReturn(CcrRepository.NAME_PREFIX);
final Set<Setting<?>> settings =
Stream.concat(
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(),
CcrSettings.getSettings().stream().filter(Setting::hasNodeScope))
.collect(Collectors.toSet());
final CcrRepository repository = new CcrRepository(
repositoryMetaData,
mock(Client.class),
new CcrLicenseChecker(() -> true, () -> true),
Settings.EMPTY,
new CcrSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, settings)),
mock(ThreadPool.class));
final ShardId followerShardId = new ShardId(new Index("follower-index-name", "follower-index-uuid"), 0);
final ShardId leaderShardId = new ShardId(new Index("leader-index-name", "leader-index-uuid"), 0);
final String retentionLeaseId =
retentionLeaseId("local-cluster", followerShardId.getIndex(), "remote-cluster", leaderShardId.getIndex());
// simulate that the the retention lease already exists on the leader, and verify that we attempt to renew it
final Client remoteClient = mock(Client.class);
final ArgumentCaptor<RetentionLeaseActions.AddRequest> addRequestCaptor =
ArgumentCaptor.forClass(RetentionLeaseActions.AddRequest.class);
doAnswer(
invocationOnMock -> {
@SuppressWarnings("unchecked") final ActionListener<RetentionLeaseActions.Response> listener =
(ActionListener<RetentionLeaseActions.Response>) invocationOnMock.getArguments()[2];
listener.onFailure(new RetentionLeaseAlreadyExistsException(retentionLeaseId));
return null;
})
.when(remoteClient)
.execute(same(RetentionLeaseActions.Add.INSTANCE), addRequestCaptor.capture(), any());
final ArgumentCaptor<RetentionLeaseActions.RenewRequest> renewRequestCaptor =
ArgumentCaptor.forClass(RetentionLeaseActions.RenewRequest.class);
doAnswer(
invocationOnMock -> {
@SuppressWarnings("unchecked") final ActionListener<RetentionLeaseActions.Response> listener =
(ActionListener<RetentionLeaseActions.Response>) invocationOnMock.getArguments()[2];
listener.onResponse(new RetentionLeaseActions.Response());
return null;
})
.when(remoteClient)
.execute(same(RetentionLeaseActions.Renew.INSTANCE), renewRequestCaptor.capture(), any());
repository.acquireRetentionLeaseOnLeader(followerShardId, retentionLeaseId, leaderShardId, remoteClient);
verify(remoteClient).execute(same(RetentionLeaseActions.Add.INSTANCE), any(RetentionLeaseActions.AddRequest.class), any());
assertThat(addRequestCaptor.getValue().getShardId(), equalTo(leaderShardId));
assertThat(addRequestCaptor.getValue().getId(), equalTo(retentionLeaseId));
assertThat(addRequestCaptor.getValue().getRetainingSequenceNumber(), equalTo(RETAIN_ALL));
assertThat(addRequestCaptor.getValue().getSource(), equalTo("ccr"));
verify(remoteClient).execute(same(RetentionLeaseActions.Renew.INSTANCE), any(RetentionLeaseActions.RenewRequest.class), any());
assertThat(renewRequestCaptor.getValue().getShardId(), equalTo(leaderShardId));
assertThat(renewRequestCaptor.getValue().getId(), equalTo(retentionLeaseId));
assertThat(renewRequestCaptor.getValue().getRetainingSequenceNumber(), equalTo(RETAIN_ALL));
assertThat(renewRequestCaptor.getValue().getSource(), equalTo("ccr"));
verifyNoMoreInteractions(remoteClient);
}
public void testWhenRetentionLeaseExpiresBeforeWeCanRenewIt() {
final RepositoryMetaData repositoryMetaData = mock(RepositoryMetaData.class);
when(repositoryMetaData.name()).thenReturn(CcrRepository.NAME_PREFIX);
final Set<Setting<?>> settings =
Stream.concat(
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(),
CcrSettings.getSettings().stream().filter(Setting::hasNodeScope))
.collect(Collectors.toSet());
final CcrRepository repository = new CcrRepository(
repositoryMetaData,
mock(Client.class),
new CcrLicenseChecker(() -> true, () -> true),
Settings.EMPTY,
new CcrSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, settings)),
mock(ThreadPool.class));
final ShardId followerShardId = new ShardId(new Index("follower-index-name", "follower-index-uuid"), 0);
final ShardId leaderShardId = new ShardId(new Index("leader-index-name", "leader-index-uuid"), 0);
final String retentionLeaseId =
retentionLeaseId("local-cluster", followerShardId.getIndex(), "remote-cluster", leaderShardId.getIndex());
// simulate that the the retention lease already exists on the leader, expires before we renew, and verify that we attempt to add it
final Client remoteClient = mock(Client.class);
final ArgumentCaptor<RetentionLeaseActions.AddRequest> addRequestCaptor =
ArgumentCaptor.forClass(RetentionLeaseActions.AddRequest.class);
final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
response.onResponse(new RetentionLeaseActions.Response());
doAnswer(
new Answer<Void>() {
final AtomicBoolean firstInvocation = new AtomicBoolean(true);
@Override
public Void answer(final InvocationOnMock invocationOnMock) {
@SuppressWarnings("unchecked") final ActionListener<RetentionLeaseActions.Response> listener =
(ActionListener<RetentionLeaseActions.Response>) invocationOnMock.getArguments()[2];
if (firstInvocation.compareAndSet(true, false)) {
listener.onFailure(new RetentionLeaseAlreadyExistsException(retentionLeaseId));
} else {
listener.onResponse(new RetentionLeaseActions.Response());
}
return null;
}
})
.when(remoteClient).execute(same(RetentionLeaseActions.Add.INSTANCE), addRequestCaptor.capture(), any());
final ArgumentCaptor<RetentionLeaseActions.RenewRequest> renewRequestCaptor =
ArgumentCaptor.forClass(RetentionLeaseActions.RenewRequest.class);
doAnswer(
invocationOnMock -> {
@SuppressWarnings("unchecked") final ActionListener<RetentionLeaseActions.Response> listener =
(ActionListener<RetentionLeaseActions.Response>) invocationOnMock.getArguments()[2];
listener.onFailure(new RetentionLeaseNotFoundException(retentionLeaseId));
return null;
}
).when(remoteClient)
.execute(same(RetentionLeaseActions.Renew.INSTANCE), renewRequestCaptor.capture(), any());
repository.acquireRetentionLeaseOnLeader(followerShardId, retentionLeaseId, leaderShardId, remoteClient);
verify(remoteClient, times(2))
.execute(same(RetentionLeaseActions.Add.INSTANCE), any(RetentionLeaseActions.AddRequest.class), any());
assertThat(addRequestCaptor.getValue().getShardId(), equalTo(leaderShardId));
assertThat(addRequestCaptor.getValue().getId(), equalTo(retentionLeaseId));
assertThat(addRequestCaptor.getValue().getRetainingSequenceNumber(), equalTo(RETAIN_ALL));
assertThat(addRequestCaptor.getValue().getSource(), equalTo("ccr"));
verify(remoteClient).execute(same(RetentionLeaseActions.Renew.INSTANCE), any(RetentionLeaseActions.RenewRequest.class), any());
assertThat(renewRequestCaptor.getValue().getShardId(), equalTo(leaderShardId));
assertThat(renewRequestCaptor.getValue().getId(), equalTo(retentionLeaseId));
assertThat(renewRequestCaptor.getValue().getRetainingSequenceNumber(), equalTo(RETAIN_ALL));
assertThat(renewRequestCaptor.getValue().getSource(), equalTo("ccr"));
verifyNoMoreInteractions(remoteClient);
}
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.security.authz.privilege;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
import org.elasticsearch.transport.TransportActionProxy;
@ -29,6 +30,8 @@ public final class SystemPrivilege extends Privilege {
"indices:admin/seq_no/global_checkpoint_sync*", // needed for global checkpoint syncs
RetentionLeaseSyncAction.ACTION_NAME + "*", // needed for retention lease syncs
RetentionLeaseBackgroundSyncAction.ACTION_NAME + "*", // needed for background retention lease syncs
RetentionLeaseActions.Add.ACTION_NAME + "*", // needed for CCR to add retention leases
RetentionLeaseActions.Renew.ACTION_NAME + "*", // needed for CCR to renew retention leases
"indices:admin/settings/update" // needed for DiskThresholdMonitor.markIndicesReadOnly
);

View File

@ -133,6 +133,10 @@ public class PrivilegeTests extends ESTestCase {
assertThat(predicate.test("indices:admin/seq_no/retention_lease_background_sync"), is(true));
assertThat(predicate.test("indices:admin/seq_no/retention_lease_background_sync[p]"), is(true));
assertThat(predicate.test("indices:admin/seq_no/retention_lease_background_sync[r]"), is(true));
assertThat(predicate.test("indices:admin/seq_no/add_retention_lease"), is(true));
assertThat(predicate.test("indices:admin/seq_no/add_retention_lease[s]"), is(true));
assertThat(predicate.test("indices:admin/seq_no/renew_retention_lease"), is(true));
assertThat(predicate.test("indices:admin/seq_no/renew_retention_lease[s]"), is(true));
assertThat(predicate.test("indices:admin/settings/update"), is(true));
assertThat(predicate.test("indices:admin/settings/foo"), is(false));
}

View File

@ -278,6 +278,7 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
}
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37165")
public void testUnfollowInjectedBeforeShrink() throws Exception {
final String indexName = "shrink-test";
final String shrunkenIndexName = "shrink-" + indexName;

View File

@ -261,6 +261,8 @@ public class AuthorizationServiceTests extends ESTestCase {
"indices:admin/seq_no/global_checkpoint_sync",
"indices:admin/seq_no/retention_lease_sync",
"indices:admin/seq_no/retention_lease_background_sync",
"indices:admin/seq_no/add_retention_lease",
"indices:admin/seq_no/renew_retention_lease",
"indices:admin/settings/update" };
for (String action : actions) {
authorize(authentication, action, request);