Migrate peer recovery from translog to retention lease (#49448)

Since 7.4, we switch from translog to Lucene as the source of history
for peer recoveries. However, we reduce the likelihood of
operation-based recoveries when performing a full cluster restart from
pre-7.4 because existing copies do not have PPRL.

To remedy this issue, we fallback using translog in peer recoveries if
the recovering replica does not have a peer recovery retention lease,
and the replication group hasn't fully migrated to PRRL.

Relates #45136
This commit is contained in:
Nhat Nguyen 2019-12-13 13:56:50 -05:00
parent c151a75dfe
commit df46848fb0
20 changed files with 447 additions and 118 deletions

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.RetentionLeaseUtils;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.rest.action.document.RestGetAction;
@ -1267,6 +1268,12 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
}
}
private void indexDocument(String id) throws IOException {
final Request indexRequest = new Request("POST", "/" + index + "/" + "_doc/" + id);
indexRequest.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("f", "v").endObject()));
assertOK(client().performRequest(indexRequest));
}
private int countOfIndexedRandomDocuments() throws IOException {
return Integer.parseInt(loadInfoDocument("count"));
}
@ -1362,4 +1369,63 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
RetentionLeaseUtils.assertAllCopiesHavePeerRecoveryRetentionLeases(client(), index);
}
}
/**
* Tests that with or without soft-deletes, we should perform an operation-based recovery if there were some
* but not too many uncommitted documents (i.e., less than 10% of committed documents or the extra translog)
* before we restart the cluster. This is important when we move from translog based to retention leases based
* peer recoveries.
*/
public void testOperationBasedRecovery() throws Exception {
if (isRunningAgainstOldCluster()) {
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean())
.build());
ensureGreen(index);
int committedDocs = randomIntBetween(100, 200);
for (int i = 0; i < committedDocs; i++) {
indexDocument(Integer.toString(i));
if (rarely()) {
flush(index, randomBoolean());
}
}
flush(index, true);
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
// less than 10% of the committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING).
int uncommittedDocs = randomIntBetween(0, (int) (committedDocs * 0.1));
for (int i = 0; i < uncommittedDocs; i++) {
final String id = Integer.toString(randomIntBetween(1, 100));
indexDocument(id);
}
} else {
ensureGreen(index);
assertNoFileBasedRecovery(index, n -> true);
}
}
/**
* Verifies that once all shard copies on the new version, we should turn off the translog retention for indices with soft-deletes.
*/
public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
if (isRunningAgainstOldCluster()) {
createIndex(index, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
ensureGreen(index);
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
indexDocument(Integer.toString(randomIntBetween(1, 100)));
if (rarely()) {
flush(index, randomBoolean());
}
}
} else {
ensureGreen(index);
flush(index, true);
assertEmptyTranslog(index);
}
}
}

View File

@ -516,10 +516,10 @@ public class RecoveryIT extends AbstractRollingTestCase {
switch (CLUSTER_TYPE) {
case OLD: break;
case MIXED:
assertNoFileBasedRecovery(indexName, s -> s.startsWith(CLUSTER_NAME + "-0"));
assertNoopRecoveries(indexName, s -> s.startsWith(CLUSTER_NAME + "-0"));
break;
case UPGRADED:
assertNoFileBasedRecovery(indexName, s -> s.startsWith(CLUSTER_NAME));
assertNoopRecoveries(indexName, s -> s.startsWith(CLUSTER_NAME));
break;
}
}
@ -692,7 +692,7 @@ public class RecoveryIT extends AbstractRollingTestCase {
}
}
private void assertNoFileBasedRecovery(String indexName, Predicate<String> targetNode) throws IOException {
private void assertNoopRecoveries(String indexName, Predicate<String> targetNode) throws IOException {
Map<String, Object> recoveries = entityAsMap(client()
.performRequest(new Request("GET", indexName + "/_recovery?detailed=true")));
@ -723,4 +723,55 @@ public class RecoveryIT extends AbstractRollingTestCase {
assertTrue("must find replica", foundReplica);
}
/**
* Tests that with or without soft-deletes, we should perform an operation-based recovery if there were some
* but not too many uncommitted documents (i.e., less than 10% of committed documents or the extra translog)
* before we upgrade each node. This is important when we move from translog based to retention leases based
* peer recoveries.
*/
public void testOperationBasedRecovery() throws Exception {
final String index = "test_operation_based_recovery";
if (CLUSTER_TYPE == ClusterType.OLD) {
createIndex(index, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()).build());
ensureGreen(index);
indexDocs(index, 0, randomIntBetween(100, 200));
flush(index, randomBoolean());
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
// uncommitted docs must be less than 10% of committed docs (see IndexSetting#FILE_BASED_RECOVERY_THRESHOLD_SETTING).
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3));
} else {
ensureGreen(index);
assertNoFileBasedRecovery(index, nodeName ->
CLUSTER_TYPE == ClusterType.UPGRADED
|| nodeName.startsWith(CLUSTER_NAME + "-0")
|| (nodeName.startsWith(CLUSTER_NAME + "-1") && Booleans.parseBoolean(System.getProperty("tests.first_round")) == false));
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 3));
}
}
/**
* Verifies that once all shard copies on the new version, we should turn off the translog retention for indices with soft-deletes.
*/
public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
final String index = "turn_off_translog_retention";
if (CLUSTER_TYPE == ClusterType.OLD) {
createIndex(index, Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 2))
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
ensureGreen(index);
indexDocs(index, 0, randomIntBetween(100, 200));
flush(index, randomBoolean());
indexDocs(index, randomIntBetween(0, 100), randomIntBetween(0, 100));
}
if (CLUSTER_TYPE == ClusterType.UPGRADED) {
ensureGreen(index);
flush(index, true);
assertEmptyTranslog(index);
}
}
}

View File

@ -252,21 +252,22 @@ public final class IndexSettings {
* Controls how long translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. Keeping more files is useful to increase
* the chance of ops based recoveries for indices with soft-deletes disabled.
* This setting will be ignored if soft-deletes is enabled.
* This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4).
**/
public static final Setting<TimeValue> INDEX_TRANSLOG_RETENTION_AGE_SETTING =
Setting.timeSetting("index.translog.retention.age",
settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? TimeValue.MINUS_ONE : TimeValue.timeValueHours(12), TimeValue.MINUS_ONE,
Property.Dynamic, Property.IndexScope);
settings -> shouldDisableTranslogRetention(settings) ? TimeValue.MINUS_ONE : TimeValue.timeValueHours(12),
TimeValue.MINUS_ONE, Property.Dynamic, Property.IndexScope);
/**
* Controls how many translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. Keeping more files is useful to increase
* the chance of ops based recoveries for indices with soft-deletes disabled.
* This setting will be ignored if soft-deletes is enabled.
* This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4).
**/
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
Setting.byteSizeSetting("index.translog.retention.size", settings -> INDEX_SOFT_DELETES_SETTING.get(settings) ? "-1" : "512MB",
Setting.byteSizeSetting("index.translog.retention.size",
settings -> shouldDisableTranslogRetention(settings) ? "-1" : "512MB",
Property.Dynamic, Property.IndexScope);
/**
@ -587,7 +588,7 @@ public final class IndexSettings {
}
private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
if (softDeleteEnabled && byteSizeValue.getBytes() >= 0) {
if (shouldDisableTranslogRetention(settings) && byteSizeValue.getBytes() >= 0) {
// ignore the translog retention settings if soft-deletes enabled
this.translogRetentionSize = new ByteSizeValue(-1);
} else {
@ -596,7 +597,7 @@ public final class IndexSettings {
}
private void setTranslogRetentionAge(TimeValue age) {
if (softDeleteEnabled && age.millis() >= 0) {
if (shouldDisableTranslogRetention(settings) && age.millis() >= 0) {
// ignore the translog retention settings if soft-deletes enabled
this.translogRetentionAge = TimeValue.MINUS_ONE;
} else {
@ -784,7 +785,7 @@ public final class IndexSettings {
* Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
*/
public ByteSizeValue getTranslogRetentionSize() {
assert softDeleteEnabled == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize;
assert shouldDisableTranslogRetention(settings) == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize;
return translogRetentionSize;
}
@ -793,7 +794,7 @@ public final class IndexSettings {
* around
*/
public TimeValue getTranslogRetentionAge() {
assert softDeleteEnabled == false || translogRetentionAge.millis() == -1L : translogRetentionSize;
assert shouldDisableTranslogRetention(settings) == false || translogRetentionAge.millis() == -1L : translogRetentionSize;
return translogRetentionAge;
}
@ -805,6 +806,11 @@ public final class IndexSettings {
return INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.get(getSettings());
}
private static boolean shouldDisableTranslogRetention(Settings settings) {
return INDEX_SOFT_DELETES_SETTING.get(settings)
&& IndexMetaData.SETTING_INDEX_VERSION_CREATED.get(settings).onOrAfter(Version.V_7_4_0);
}
/**
* Returns the generation threshold size. As sequence numbers can cause multiple generations to
* be preserved for rollback purposes, we want to keep the size of individual generations from

View File

@ -66,6 +66,7 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
@ -729,7 +730,7 @@ public abstract class Engine implements Closeable {
/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
public abstract Closeable acquireRetentionLock();
public abstract Closeable acquireHistoryRetentionLock(HistorySource historySource);
/**
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
@ -742,19 +743,20 @@ public abstract class Engine implements Closeable {
* Creates a new history snapshot for reading operations since {@code startingSeqNo} (inclusive).
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
public abstract Translog.Snapshot readHistoryOperations(String source,
MapperService mapperService, long startingSeqNo) throws IOException;
public abstract Translog.Snapshot readHistoryOperations(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) throws IOException;
/**
* Returns the estimated number of history operations whose seq# at least {@code startingSeqNo}(inclusive) in this engine.
*/
public abstract int estimateNumberOfHistoryOperations(String source,
MapperService mapperService, long startingSeqNo) throws IOException;
public abstract int estimateNumberOfHistoryOperations(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) throws IOException;
/**
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
*/
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;
public abstract boolean hasCompleteOperationHistory(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) throws IOException;
/**
* Gets the minimum retained sequence number for this engine.
@ -1819,7 +1821,8 @@ public abstract class Engine implements Closeable {
}
}
public void onSettingsChanged() {
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
}
/**
@ -1953,4 +1956,11 @@ public abstract class Engine implements Closeable {
* to advance this marker to at least the given sequence number.
*/
public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary);
/**
* Whether we should read history operations from translog or Lucene index
*/
public enum HistorySource {
TRANSLOG, INDEX
}
}

View File

@ -68,6 +68,8 @@ import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
@ -538,27 +540,31 @@ public class InternalEngine extends Engine {
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
@Override
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
return newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
public Translog.Snapshot readHistoryOperations(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) throws IOException {
if (historySource == HistorySource.INDEX) {
ensureSoftDeletesEnabled();
return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
} else {
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
}
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
}
/**
* Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
*/
@Override
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
try (Translog.Snapshot snapshot = newChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo),
public int estimateNumberOfHistoryOperations(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) throws IOException {
if (historySource == HistorySource.INDEX) {
ensureSoftDeletesEnabled();
try (Translog.Snapshot snapshot = newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo),
Long.MAX_VALUE, false)) {
return snapshot.totalOperations();
}
} else {
return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo);
}
return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo);
}
@Override
@ -2483,7 +2489,8 @@ public class InternalEngine extends Engine {
}
}
public void onSettingsChanged() {
@Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
mergeScheduler.refreshConfig();
// config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
maybePruneDeletes();
@ -2494,10 +2501,9 @@ public class InternalEngine extends Engine {
updateAutoIdTimestamp(Long.MAX_VALUE, true);
}
final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
final IndexSettings indexSettings = engineConfig.getIndexSettings();
translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis());
translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes());
softDeletesPolicy.setRetentionOperations(indexSettings.getSoftDeleteRetentionOperations());
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps);
}
public MergeStats getMergeStats() {
@ -2604,12 +2610,17 @@ public class InternalEngine extends Engine {
return numDocUpdates.count();
}
private void ensureSoftDeletesEnabled() {
if (softDeleteEnabled == false) {
assert false : "index " + shardId.getIndex() + " does not have soft-deletes enabled";
throw new IllegalStateException("index " + shardId.getIndex() + " does not have soft-deletes enabled");
}
}
@Override
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
if (softDeleteEnabled == false) {
throw new IllegalStateException("accessing changes snapshot requires soft-deletes enabled");
}
ensureSoftDeletesEnabled();
ensureOpen();
refreshIfNeeded(source, toSeqNo);
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
@ -2631,26 +2642,28 @@ public class InternalEngine extends Engine {
}
@Override
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
public boolean hasCompleteOperationHistory(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) throws IOException {
if (historySource == HistorySource.INDEX) {
ensureSoftDeletesEnabled();
return getMinRetainedSeqNo() <= startingSeqNo;
}
final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
// avoid scanning translog if not necessary
if (startingSeqNo > currentLocalCheckpoint) {
return true;
}
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
tracker.markSeqNoAsProcessed(operation.seqNo());
} else {
final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
// avoid scanning translog if not necessary
if (startingSeqNo > currentLocalCheckpoint) {
return true;
}
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
tracker.markSeqNoAsProcessed(operation.seqNo());
}
}
}
return tracker.getProcessedCheckpoint() >= currentLocalCheckpoint;
}
return tracker.getProcessedCheckpoint() >= currentLocalCheckpoint;
}
/**
@ -2658,13 +2671,14 @@ public class InternalEngine extends Engine {
* Operations whose seq# are at least this value should exist in the Lucene index.
*/
public final long getMinRetainedSeqNo() {
assert softDeleteEnabled : Thread.currentThread().getName();
ensureSoftDeletesEnabled();
return softDeletesPolicy.getMinRetainedSeqNo();
}
@Override
public Closeable acquireRetentionLock() {
if (softDeleteEnabled) {
public Closeable acquireHistoryRetentionLock(HistorySource historySource) {
if (historySource == HistorySource.INDEX) {
ensureSoftDeletesEnabled();
return softDeletesPolicy.acquireRetentionLock();
} else {
return translog.acquireRetentionLock();

View File

@ -307,7 +307,7 @@ public class ReadOnlyEngine extends Engine {
}
@Override
public Closeable acquireRetentionLock() {
public Closeable acquireHistoryRetentionLock(HistorySource historySource) {
return () -> {};
}
@ -317,21 +317,24 @@ public class ReadOnlyEngine extends Engine {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled() == false) {
throw new IllegalStateException("accessing changes snapshot requires soft-deletes enabled");
}
return readHistoryOperations(source, mapperService, fromSeqNo);
}
@Override
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
return newEmptySnapshot();
}
@Override
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) throws IOException {
public Translog.Snapshot readHistoryOperations(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) {
return newEmptySnapshot();
}
@Override
public int estimateNumberOfHistoryOperations(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) {
return 0;
}
@Override
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException {
public boolean hasCompleteOperationHistory(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) {
// we can do operation-based recovery if we don't have to replay any operation.
return startingSeqNo > seqNoStats.getMaxSeqNo();
}

View File

@ -906,9 +906,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) ||
this.hasAllPeerRecoveryRetentionLeases = indexSettings.isSoftDeleteEnabled() &&
(indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_6_0) ||
(indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) &&
indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN);
indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN));
this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
this.safeCommitInfoSupplier = safeCommitInfoSupplier;
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
@ -1363,6 +1364,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
assert invariant();
}
public synchronized boolean hasAllPeerRecoveryRetentionLeases() {
return hasAllPeerRecoveryRetentionLeases;
}
/**
* Create any required peer-recovery retention leases that do not currently exist because we just did a rolling upgrade from a version
* prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases.

View File

@ -67,6 +67,7 @@ import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -159,6 +160,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -270,6 +272,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final AtomicLong lastSearcherAccess = new AtomicLong();
private final AtomicReference<Translog.Location> pendingRefreshLocation = new AtomicReference<>();
private volatile boolean useRetentionLeasesInPeerRecovery;
public IndexShard(
final ShardRouting shardRouting,
@ -366,6 +369,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
refreshListeners = buildRefreshListeners();
lastSearcherAccess.set(threadPool.relativeTimeInMillis());
persistMetadata(path, indexSettings, shardRouting, null, logger);
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
}
public ThreadPool getThreadPool() {
@ -602,6 +606,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (newRouting.equals(currentRouting) == false) {
indexEventListener.shardRoutingChanged(this, currentRouting, newRouting);
}
if (indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery == false) {
final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases();
final Set<ShardRouting> shardRoutings = new HashSet<>(routingTable.getShards());
shardRoutings.addAll(routingTable.assignedShards()); // include relocation targets
if (shardRoutings.stream().allMatch(
shr -> shr.assignedToNode() && retentionLeases.contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(shr)))) {
useRetentionLeasesInPeerRecovery = true;
turnOffTranslogRetention();
}
}
}
/**
@ -1902,38 +1917,63 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public void onSettingsChanged() {
Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null) {
engineOrNull.onSettingsChanged();
final boolean useRetentionLeasesInPeerRecovery = this.useRetentionLeasesInPeerRecovery;
engineOrNull.onSettingsChanged(
useRetentionLeasesInPeerRecovery ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(),
useRetentionLeasesInPeerRecovery ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations()
);
}
}
private void turnOffTranslogRetention() {
logger.debug("turn off the translog retention for the replication group {} " +
"as it starts using retention leases exclusively in peer recoveries", shardId);
// Off to the generic threadPool as pruning the delete tombstones can be expensive.
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to turn off translog retention", e);
}
}
@Override
protected void doRun() {
onSettingsChanged();
trimTranslog();
}
});
}
/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
public Closeable acquireRetentionLock() {
return getEngine().acquireRetentionLock();
public Closeable acquireHistoryRetentionLock(Engine.HistorySource source) {
return getEngine().acquireHistoryRetentionLock(source);
}
/**
* Returns the estimated number of history operations whose seq# at least the provided seq# in this shard.
*/
public int estimateNumberOfHistoryOperations(String source, long startingSeqNo) throws IOException {
return getEngine().estimateNumberOfHistoryOperations(source, mapperService, startingSeqNo);
public int estimateNumberOfHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException {
return getEngine().estimateNumberOfHistoryOperations(reason, source, mapperService, startingSeqNo);
}
/**
* Creates a new history snapshot for reading operations since the provided starting seqno (inclusive).
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
public Translog.Snapshot getHistoryOperations(String source, long startingSeqNo) throws IOException {
return getEngine().readHistoryOperations(source, mapperService, startingSeqNo);
public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException {
return getEngine().readHistoryOperations(reason, source, mapperService, startingSeqNo);
}
/**
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
* This method should be called after acquiring the retention lock; See {@link #acquireRetentionLock()}
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock(Engine.HistorySource)}
*/
public boolean hasCompleteHistoryOperations(String source, long startingSeqNo) throws IOException {
return getEngine().hasCompleteOperationHistory(source, mapperService, startingSeqNo);
public boolean hasCompleteHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException {
return getEngine().hasCompleteOperationHistory(reason, source, mapperService, startingSeqNo);
}
/**
@ -2122,9 +2162,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert assertPrimaryMode();
verifyNotClosed();
ensureSoftDeletesEnabled("retention leases");
try (Closeable ignore = acquireRetentionLock()) {
try (Closeable ignore = acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) {
final long actualRetainingSequenceNumber =
retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber;
retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber;
return replicationTracker.addRetentionLease(id, actualRetainingSequenceNumber, source, listener);
} catch (final IOException e) {
throw new AssertionError(e);
@ -2144,7 +2184,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert assertPrimaryMode();
verifyNotClosed();
ensureSoftDeletesEnabled("retention leases");
try (Closeable ignore = acquireRetentionLock()) {
try (Closeable ignore = acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) {
final long actualRetainingSequenceNumber =
retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber;
return replicationTracker.renewRetentionLease(id, actualRetainingSequenceNumber, source);
@ -2627,6 +2667,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return replicationTracker.getPeerRecoveryRetentionLeases();
}
public boolean useRetentionLeasesInPeerRecovery() {
return useRetentionLeasesInPeerRecovery;
}
private SafeCommitInfo getSafeCommitInfo() {
final Engine engine = getEngineOrNull();
return engine == null ? SafeCommitInfo.EMPTY : engine.getSafeCommitInfo();

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.tasks.Task;
@ -90,7 +91,9 @@ public class PrimaryReplicaSyncer {
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
// Also fail the resync early if the shard is shutting down
snapshot = indexShard.getHistoryOperations("resync", startingSeqNo);
snapshot = indexShard.getHistoryOperations("resync",
indexShard.indexSettings.isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG,
startingSeqNo);
final Translog.Snapshot originalSnapshot = snapshot;
final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
@Override

View File

@ -171,22 +171,28 @@ public class RecoverySourceHandler {
ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)) : null);
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",
shard, cancellableThreads, logger);
final Closeable retentionLock = shard.acquireRetentionLock();
final Engine.HistorySource historySource;
if (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null) {
historySource = Engine.HistorySource.INDEX;
} else {
historySource = Engine.HistorySource.TRANSLOG;
}
final Closeable retentionLock = shard.acquireHistoryRetentionLock(historySource);
resources.add(retentionLock);
final long startingSeqNo;
final boolean isSequenceNumberBasedRecovery
= request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
&& isTargetSameHistory()
&& shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo())
&& (softDeletesEnabled == false
|| (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));
&& shard.hasCompleteHistoryOperations("peer-recovery", historySource, request.startingSeqNo())
&& (historySource == Engine.HistorySource.TRANSLOG ||
(retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));
// NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease,
// because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's
// possible there are other cases where we cannot satisfy all leases, because that's not a property we currently expect to hold.
// Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery
// without having a complete history.
if (isSequenceNumberBasedRecovery && softDeletesEnabled) {
if (isSequenceNumberBasedRecovery && retentionLeaseRef.get() != null) {
// all the history we need is retained by an existing retention lease, so we do not need a separate retention lock
retentionLock.close();
logger.trace("history is retained by {}", retentionLeaseRef.get());
@ -205,7 +211,11 @@ public class RecoverySourceHandler {
if (isSequenceNumberBasedRecovery) {
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
startingSeqNo = request.startingSeqNo();
sendFileStep.onResponse(SendFileResult.EMPTY);
if (softDeletesEnabled && retentionLeaseRef.get() == null) {
createRetentionLease(startingSeqNo, ActionListener.map(sendFileStep, ignored -> SendFileResult.EMPTY));
} else {
sendFileStep.onResponse(SendFileResult.EMPTY);
}
} else {
final Engine.IndexCommitRef safeCommitRef;
try {
@ -231,7 +241,7 @@ public class RecoverySourceHandler {
logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);
try {
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo);
final Releasable releaseStore = acquireStore(shard.store());
resources.add(releaseStore);
sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
@ -284,7 +294,8 @@ public class RecoverySourceHandler {
sendFileStep.whenComplete(r -> {
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo), prepareEngineStep);
prepareTargetForTranslog(
shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo), prepareEngineStep);
}, onFailure);
prepareEngineStep.whenComplete(prepareEngineTime -> {
@ -300,14 +311,10 @@ public class RecoverySourceHandler {
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
logger.trace("snapshot translog for recovery; current size is [{}]",
shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo));
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo);
shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo));
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", historySource, startingSeqNo);
resources.add(phase2Snapshot);
if (softDeletesEnabled == false || isSequenceNumberBasedRecovery == false) {
// we can release the retention lock here because the snapshot itself will retain the required operations.
retentionLock.close();
}
retentionLock.close();
// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.

View File

@ -317,7 +317,9 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private boolean hasUncommittedOperations() throws IOException {
long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
return indexShard.estimateNumberOfHistoryOperations("peer-recovery", localCheckpointOfCommit + 1) > 0;
return indexShard.estimateNumberOfHistoryOperations("peer-recovery",
indexShard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG,
localCheckpointOfCommit + 1) > 0;
}
@Override

View File

@ -364,7 +364,7 @@ public class IndexSettingsTests extends ESTestCase {
.build());
settings = new IndexSettings(metaData, Settings.EMPTY);
assertEquals(IndexSettings.MAX_ADJACENCY_MATRIX_FILTERS_SETTING.get(Settings.EMPTY).intValue(),
settings.getMaxAdjacencyMatrixFilters());
settings.getMaxAdjacencyMatrixFilters());
assertWarnings("[index.max_adjacency_matrix_filters] setting was deprecated in Elasticsearch and will be removed in a "
+ "future release! See the breaking changes documentation for the next major version.");
}
@ -583,7 +583,7 @@ public class IndexSettingsTests extends ESTestCase {
public void testIgnoreTranslogRetentionSettingsIfSoftDeletesEnabled() {
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(), Version.V_7_0_0, Version.CURRENT));
.put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(), Version.V_7_4_0, Version.CURRENT));
if (randomBoolean()) {
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomPositiveTimeValue());
}

View File

@ -196,6 +196,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
@ -380,7 +381,8 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
assertThat(segments.get(1).isCompound(), equalTo(true));
engine.onSettingsChanged();
engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations());
ParsedDocument doc4 = testParsedDocument("4", null, testDocumentWithTextField(), B_3, null);
engine.index(indexForDoc(doc4));
engine.refresh("test");
@ -1628,7 +1630,8 @@ public class InternalEngineTests extends EngineTestCase {
}
settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0);
indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
engine.onSettingsChanged();
engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations());
globalCheckpoint.set(localCheckpoint);
engine.syncTranslog();
@ -1719,7 +1722,8 @@ public class InternalEngineTests extends EngineTestCase {
}
settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0);
indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
engine.onSettingsChanged();
engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations());
// If we already merged down to 1 segment, then the next force-merge will be a noop. We need to add an extra segment to make
// merges happen so we can verify that _recovery_source are pruned. See: https://github.com/elastic/elasticsearch/issues/41628.
final int numSegments;
@ -5056,7 +5060,8 @@ public class InternalEngineTests extends EngineTestCase {
.settings(Settings.builder().put(indexSettings.getSettings())
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build();
indexSettings.updateIndexMetaData(indexMetaData);
engine.onSettingsChanged();
engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations());
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
engine.flush();
@ -5104,7 +5109,8 @@ public class InternalEngineTests extends EngineTestCase {
.settings(Settings.builder().put(indexSettings.getSettings())
.put(IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING.getKey(), "0b")).build();
indexSettings.updateIndexMetaData(indexMetaData);
engine.onSettingsChanged();
engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations());
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(1));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
doc = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null);
@ -5129,7 +5135,8 @@ public class InternalEngineTests extends EngineTestCase {
.put(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(), generationThreshold + "b")
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build();
indexSettings.updateIndexMetaData(indexMetaData);
engine.onSettingsChanged();
engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations());
final int numOps = scaledRandomIntBetween(100, 10_000);
for (int i = 0; i < numOps; i++) {
final long localCheckPoint = engine.getProcessedLocalCheckpoint();
@ -5157,7 +5164,8 @@ public class InternalEngineTests extends EngineTestCase {
.settings(Settings.builder().put(indexSettings.getSettings())
.put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(1))).build();
engine.engineConfig.getIndexSettings().updateIndexMetaData(indexMetaData);
engine.onSettingsChanged();
engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations());
ParsedDocument document = testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null);
final Engine.Index doc = new Engine.Index(newUid(document), document, UNASSIGNED_SEQ_NO, 0,
Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(),
@ -5427,7 +5435,8 @@ public class InternalEngineTests extends EngineTestCase {
if (rarely()) {
settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10));
indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
engine.onSettingsChanged();
engine.onSettingsChanged(indexSettings.getTranslogRetentionAge(), indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations());
}
if (rarely()) {
engine.refresh("test");
@ -5440,7 +5449,7 @@ public class InternalEngineTests extends EngineTestCase {
if (rarely()) {
engine.forceMerge(randomBoolean());
}
try (Closeable ignored = engine.acquireRetentionLock()) {
try (Closeable ignored = engine.acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) {
long minRetainSeqNos = engine.getMinRetainedSeqNo();
assertThat(minRetainSeqNos, lessThanOrEqualTo(globalCheckpoint.get() + 1));
Long[] expectedOps = existingSeqNos.stream().filter(seqno -> seqno >= minRetainSeqNos).toArray(Long[]::new);
@ -5721,9 +5730,9 @@ public class InternalEngineTests extends EngineTestCase {
IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(Settings.builder().
put(defaultSettings.getSettings()).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false)).build());
try (InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))) {
IllegalStateException error = expectThrows(IllegalStateException.class,
AssertionError error = expectThrows(AssertionError.class,
() -> engine.newChangesSnapshot("test", createMapperService("test"), 0, randomNonNegativeLong(), randomBoolean()));
assertThat(error.getMessage(), equalTo("accessing changes snapshot requires soft-deletes enabled"));
assertThat(error.getMessage(), containsString("does not have soft-deletes enabled"));
}
}
}

View File

@ -474,7 +474,8 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
assertThat(snapshot.totalOperations(), equalTo(0));
}
}
try (Translog.Snapshot snapshot = shard.getHistoryOperations("test", 0)) {
try (Translog.Snapshot snapshot = shard.getHistoryOperations(
"test", shard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, 0)) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
}
}
@ -492,7 +493,8 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(Collections.singletonList(noop2)));
}
}
try (Translog.Snapshot snapshot = shard.getHistoryOperations("test", 0)) {
try (Translog.Snapshot snapshot = shard.getHistoryOperations(
"test", shard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, 0)) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
}
}
@ -589,7 +591,8 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
shards.promoteReplicaToPrimary(replica2).get();
logger.info("--> Recover replica3 from replica2");
recoverReplica(replica3, replica2, true);
try (Translog.Snapshot snapshot = replica3.getHistoryOperations("test", 0)) {
try (Translog.Snapshot snapshot = replica3.getHistoryOperations(
"test", replica3.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, 0)) {
assertThat(snapshot.totalOperations(), equalTo(initDocs + 1));
final List<Translog.Operation> expectedOps = new ArrayList<>(initOperations);
expectedOps.add(op2);

View File

@ -32,6 +32,7 @@ import org.elasticsearch.index.seqno.RetentionLeaseUtils;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.VersionUtils;
import java.util.ArrayList;
import java.util.List;
@ -147,6 +148,30 @@ public class RetentionLeasesReplicationTests extends ESIndexLevelReplicationTest
}
}
public void testTurnOffTranslogRetentionAfterAllShardStarted() throws Exception {
final Settings.Builder settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
if (randomBoolean()) {
settings.put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomIndexCompatibleVersion(random()));
}
try (ReplicationGroup group = createGroup(between(1, 2), settings.build())) {
group.startAll();
group.indexDocs(randomIntBetween(1, 10));
for (IndexShard shard : group) {
shard.updateShardState(shard.routingEntry(), shard.getOperationPrimaryTerm(), null, 1L,
group.getPrimary().getReplicationGroup().getInSyncAllocationIds(),
group.getPrimary().getReplicationGroup().getRoutingTable());
}
group.syncGlobalCheckpoint();
group.flush();
assertBusy(() -> {
// we turn off the translog retention policy using the generic threadPool
for (IndexShard shard : group) {
assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(0));
}
});
}
}
static final class SyncRetentionLeasesResponse extends ReplicationResponse {
final RetentionLeaseSyncAction.Request syncRequest;
SyncRetentionLeasesResponse(RetentionLeaseSyncAction.Request syncRequest) {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
@ -109,7 +110,7 @@ public class RetentionLeaseIT extends ESIntegTestCase {
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = countDownLatchListener(latch);
// simulate a peer recovery which locks the soft deletes policy on the primary
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {};
final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {};
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
latch.await();
retentionLock.close();
@ -159,7 +160,7 @@ public class RetentionLeaseIT extends ESIntegTestCase {
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = countDownLatchListener(latch);
// simulate a peer recovery which locks the soft deletes policy on the primary
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {};
final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {};
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
latch.await();
retentionLock.close();
@ -170,7 +171,7 @@ public class RetentionLeaseIT extends ESIntegTestCase {
final CountDownLatch latch = new CountDownLatch(1);
primary.removeRetentionLease(id, countDownLatchListener(latch));
// simulate a peer recovery which locks the soft deletes policy on the primary
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {};
final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {};
currentRetentionLeases.remove(id);
latch.await();
retentionLock.close();

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.TestTranslog;
@ -63,6 +64,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@ -210,7 +212,9 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
operations.add(new Translog.Index(
"_doc", Integer.toString(i), randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : i, primaryTerm, new byte[]{1}));
}
doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), anyLong());
Engine.HistorySource source =
shard.indexSettings.isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG;
doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), eq(source), anyLong());
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
List<Translog.Operation> sentOperations = new ArrayList<>();
PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> {

View File

@ -245,7 +245,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId());
shards.recoverReplica(newReplica);
shards.assertAllEqual(3);
try (Translog.Snapshot snapshot = newReplica.getHistoryOperations("test", 0)) {
try (Translog.Snapshot snapshot = newReplica.getHistoryOperations("test", Engine.HistorySource.INDEX, 0)) {
assertThat(snapshot, SnapshotMatchers.size(6));
}
}

View File

@ -55,6 +55,7 @@ import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -1067,4 +1068,78 @@ public abstract class ESRestTestCase extends ESTestCase {
return false;
}
}
public void flush(String index, boolean force) throws IOException {
logger.info("flushing index {} force={}", index, force);
final Request flushRequest = new Request("POST", "/" + index + "/_flush");
flushRequest.addParameter("force", Boolean.toString(force));
flushRequest.addParameter("wait_if_ongoing", "true");
assertOK(client().performRequest(flushRequest));
}
/**
* Asserts that replicas on nodes satisfying the {@code targetNode} should have perform operation-based recoveries.
*/
public void assertNoFileBasedRecovery(String indexName, Predicate<String> targetNode) throws IOException {
Map<String, Object> recoveries = entityAsMap(client().performRequest(new Request("GET", indexName + "/_recovery?detailed=true")));
@SuppressWarnings("unchecked")
List<Map<String, ?>> shards = (List<Map<String, ?>>) XContentMapValues.extractValue(indexName + ".shards", recoveries);
assertNotNull(shards);
boolean foundReplica = false;
logger.info("index {} recovery stats {}", indexName, shards);
for (Map<String, ?> shard : shards) {
if (shard.get("primary") == Boolean.FALSE && targetNode.test((String) XContentMapValues.extractValue("target.name", shard))) {
List<?> details = (List<?>) XContentMapValues.extractValue("index.files.details", shard);
// once detailed recoveries works, remove this if.
if (details == null) {
long totalFiles = ((Number) XContentMapValues.extractValue("index.files.total", shard)).longValue();
long reusedFiles = ((Number) XContentMapValues.extractValue("index.files.reused", shard)).longValue();
logger.info("total [{}] reused [{}]", totalFiles, reusedFiles);
assertThat("must reuse all files, recoveries [" + recoveries + "]", totalFiles, equalTo(reusedFiles));
} else {
assertNotNull(details);
assertThat(details, Matchers.empty());
}
foundReplica = true;
}
}
assertTrue("must find replica", foundReplica);
}
/**
* Asserts that we do not retain any extra translog for the given index (i.e., turn off the translog retention)
*/
public void assertEmptyTranslog(String index) throws Exception {
Map<String, Object> stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards")));
assertThat(XContentMapValues.extractValue("indices." + index + ".total.translog.uncommitted_operations", stats), equalTo(0));
assertThat(XContentMapValues.extractValue("indices." + index + ".total.translog.operations", stats), equalTo(0));
}
/**
* Peer recovery retention leases are renewed and synced to replicas periodically (every 30 seconds). This ensures
* that we have renewed every PRRL to the global checkpoint of the corresponding copy and properly synced to all copies.
*/
public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index) throws Exception {
assertBusy(() -> {
Map<String, Object> stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards")));
@SuppressWarnings("unchecked") Map<String, List<Map<String, ?>>> shards =
(Map<String, List<Map<String, ?>>>) XContentMapValues.extractValue("indices." + index + ".shards", stats);
for (List<Map<String, ?>> shard : shards.values()) {
for (Map<String, ?> copy : shard) {
Integer globalCheckpoint = (Integer) XContentMapValues.extractValue("seq_no.global_checkpoint", copy);
assertNotNull(globalCheckpoint);
@SuppressWarnings("unchecked") List<Map<String, ?>> retentionLeases =
(List<Map<String, ?>>) XContentMapValues.extractValue("retention_leases.leases", copy);
if (retentionLeases == null) {
continue;
}
for (Map<String, ?> retentionLease : retentionLeases) {
if (((String) retentionLease.get("id")).startsWith("peer_recovery/")) {
assertThat(retentionLease.get("retaining_seq_no"), equalTo(globalCheckpoint + 1));
}
}
}
}
}, 60, TimeUnit.SECONDS);
}
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
@ -75,7 +76,7 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
operations,
numOps - 1, followerPrimary, logger);
try (Translog.Snapshot snapshot = followerPrimary.getHistoryOperations("test", 0)) {
try (Translog.Snapshot snapshot = followerPrimary.getHistoryOperations("test", Engine.HistorySource.INDEX, 0)) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {