Create retention leases file during recovery (#39359)

Today we load the shard history retention leases from disk whenever opening the
engine, and treat a missing file as an empty set of leases. However in some
cases this is inappropriate: we might be restoring from a snapshot (if the
target index already exists then there may be leases on disk) or
force-allocating a stale primary, and in neither case does it make sense to
restore the retention leases from disk.

With this change we write an empty retention leases file during recovery,
except for the following cases:

- During peer recovery the on-disk leases may be accurate and could be needed
  if the recovery target is made into a primary.

- During recovery from an existing store, as long as we are not
  force-allocating a stale primary.

Relates #37165
This commit is contained in:
David Turner 2019-03-15 07:36:05 +00:00
parent 8d2184b315
commit a323132503
8 changed files with 150 additions and 1 deletions

View File

@ -97,6 +97,10 @@ public abstract class RecoverySource implements Writeable, ToXContentObject {
return false;
}
public boolean expectEmptyRetentionLeases() {
return true;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -181,6 +185,11 @@ public abstract class RecoverySource implements Writeable, ToXContentObject {
public String toString() {
return "existing store recovery; bootstrap_history_uuid=" + bootstrapNewHistoryUUID;
}
@Override
public boolean expectEmptyRetentionLeases() {
return bootstrapNewHistoryUUID;
}
}
/**
@ -317,5 +326,10 @@ public abstract class RecoverySource implements Writeable, ToXContentObject {
public String toString() {
return "peer recovery";
}
@Override
public boolean expectEmptyRetentionLeases() {
return false;
}
}
}

View File

@ -330,6 +330,9 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
final RetentionLeases retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
// TODO after backporting we expect this never to happen in 8.x, so adjust this to throw an exception instead.
assert Version.CURRENT.major <= 8 : "throw an exception instead of returning EMPTY on null";
if (retentionLeases == null) {
return RetentionLeases.EMPTY;
}
@ -355,6 +358,11 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
}
}
public boolean assertRetentionLeasesPersisted(final Path path) throws IOException {
assert RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path) != null;
return true;
}
public static class CheckpointState implements Writeable {
/**

View File

@ -1434,6 +1434,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
updateRetentionLeasesOnReplica(loadRetentionLeases());
assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty()
: "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()
+ "] but got " + getRetentionLeases();
trimUnsafeCommits();
synchronized (mutex) {
verifyNotClosed();
@ -2029,6 +2032,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
replicationTracker.persistRetentionLeases(path.getShardStatePath());
}
public boolean assertRetentionLeasesPersisted() throws IOException {
return replicationTracker.assertRetentionLeasesPersisted(path.getShardStatePath());
}
/**
* Syncs the current retention leases to all replicas.
*/

View File

@ -401,9 +401,11 @@ final class StoreRecovery {
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
writeEmptyRetentionLeasesFile(indexShard);
} else if (indexShouldExists) {
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
store.bootstrapNewHistory();
writeEmptyRetentionLeasesFile(indexShard);
}
// since we recover from local, just fill the files and size
try {
@ -420,6 +422,7 @@ final class StoreRecovery {
indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, shardId,
indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
writeEmptyRetentionLeasesFile(indexShard);
}
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
@ -432,6 +435,12 @@ final class StoreRecovery {
}
}
private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws IOException {
assert indexShard.getRetentionLeases().leases().isEmpty() : indexShard.getRetentionLeases(); // not loaded yet
indexShard.persistRetentionLeases();
assert indexShard.loadRetentionLeases().leases().isEmpty();
}
private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException {
final Directory directory = store.directory();
for (String name : Lucene.files(si)) {
@ -471,6 +480,7 @@ final class StoreRecovery {
indexShard.shardPath().resolveTranslog(), localCheckpoint, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
writeEmptyRetentionLeasesFile(indexShard);
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();

View File

@ -414,6 +414,15 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId,
indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
if (indexShard.getRetentionLeases().leases().isEmpty()) {
// if empty, may be a fresh IndexShard, so write an empty leases file to disk
indexShard.persistRetentionLeases();
assert indexShard.loadRetentionLeases().leases().isEmpty();
} else {
assert indexShard.assertRetentionLeasesPersisted();
}
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are

View File

@ -19,9 +19,11 @@
package org.elasticsearch.index.seqno;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -31,6 +33,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
@ -45,6 +48,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -53,6 +57,7 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.contains;
@ -388,6 +393,29 @@ public class RetentionLeaseIT extends ESIntegTestCase {
currentRetentionLeases.put(id, primary.renewRetentionLease(id, retainingSequenceNumber, source));
}
// Cause some recoveries to fail to ensure that retention leases are handled properly when retrying a recovery
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder()
.put(INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), "100ms")));
final Semaphore recoveriesToDisrupt = new Semaphore(scaledRandomIntBetween(0, 4));
final MockTransportService primaryTransportService
= (MockTransportService) internalCluster().getInstance(TransportService.class, primaryShardNodeName);
primaryTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.FINALIZE) && recoveriesToDisrupt.tryAcquire()) {
if (randomBoolean()) {
// return a ConnectTransportException to the START_RECOVERY action
final TransportService replicaTransportService
= internalCluster().getInstance(TransportService.class, connection.getNode().getName());
final DiscoveryNode primaryNode = primaryTransportService.getLocalNode();
replicaTransportService.disconnectFromNode(primaryNode);
replicaTransportService.connectToNode(primaryNode);
} else {
// return an exception to the FINALIZE action
throw new ElasticsearchException("failing recovery for test purposes");
}
}
connection.sendRequest(requestId, action, request, options);
});
// now allow the replicas to be allocated and wait for recovery to finalize
allowNodes("index", 1 + numberOfReplicas);
ensureGreen("index");

View File

@ -265,6 +265,20 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
} finally {
closeShards(recoveredShard);
}
// we should not recover retention leases when force-allocating a stale primary
final IndexShard forceRecoveredShard = reinitShard(
indexShard,
ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE));
try {
recoverShardFromStore(forceRecoveredShard);
final RetentionLeases recoveredRetentionLeases = forceRecoveredShard.getEngine().config().retentionLeasesSupplier().get();
assertThat(recoveredRetentionLeases.leases(), empty());
assertThat(recoveredRetentionLeases.version(), equalTo(0L));
} finally {
closeShards(forceRecoveredShard);
}
} finally {
closeShards(indexShard);
}

View File

@ -23,9 +23,15 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotReq
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
@ -43,6 +49,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
@ -87,8 +94,8 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
int[] docCounts = new int[indexCount];
String[] indexNames = generateRandomNames(indexCount);
for (int i = 0; i < indexCount; i++) {
logger.info("--> create random index {} with {} records", indexNames[i], docCounts[i]);
docCounts[i] = iterations(10, 1000);
logger.info("--> create random index {} with {} records", indexNames[i], docCounts[i]);
addRandomDocuments(indexNames[i], docCounts[i]);
assertHitCount(client().prepareSearch(indexNames[i]).setSize(0).get(), docCounts[i]);
}
@ -267,6 +274,58 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
}
}
public void testRetentionLeasesClearedOnRestore() throws Exception {
final String repoName = randomAsciiName();
logger.info("--> creating repository {}", repoName);
createAndCheckTestRepository(repoName);
final String indexName = randomAsciiName();
final int shardCount = randomIntBetween(1, 5);
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, shardCount)).get());
final ShardId shardId = new ShardId(resolveIndex(indexName), randomIntBetween(0, shardCount - 1));
final int snapshotDocCount = iterations(10, 1000);
logger.info("--> indexing {} docs into {}", snapshotDocCount, indexName);
addRandomDocuments(indexName, snapshotDocCount);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);
final String leaseId = randomAsciiName();
logger.info("--> adding retention lease with id {} to {}", leaseId, shardId);
client().execute(RetentionLeaseActions.Add.INSTANCE, new RetentionLeaseActions.AddRequest(
shardId, leaseId, RETAIN_ALL, "test")).actionGet();
final ShardStats shardStats = Arrays.stream(client().admin().indices().prepareStats(indexName).get().getShards())
.filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get();
final RetentionLeases retentionLeases = shardStats.getRetentionLeaseStats().retentionLeases();
assertTrue(shardStats + ": " + retentionLeases, retentionLeases.contains(leaseId));
final String snapshotName = randomAsciiName();
logger.info("--> create snapshot {}:{}", repoName, snapshotName);
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).setIndices(indexName));
if (randomBoolean()) {
final int extraDocCount = iterations(10, 1000);
logger.info("--> indexing {} extra docs into {}", extraDocCount, indexName);
addRandomDocuments(indexName, extraDocCount);
}
logger.info("--> close index {}", indexName);
assertAcked(client().admin().indices().prepareClose(indexName));
logger.info("--> restore index {} from snapshot", indexName);
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName).setWaitForCompletion(true));
ensureGreen();
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), snapshotDocCount);
final RetentionLeases restoredRetentionLeases = Arrays.stream(client().admin().indices().prepareStats(indexName).get()
.getShards()).filter(s -> s.getShardRouting().shardId().equals(shardId)).findFirst().get()
.getRetentionLeaseStats().retentionLeases();
assertFalse(restoredRetentionLeases.toString() + " has no " + leaseId, restoredRetentionLeases.contains(leaseId));
}
protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException {
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) {