diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 48a3caf0ea3..2a0d89cb1e2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.engine; import com.carrotsearch.hppc.ObjectIntHashMap; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.store.Directory; @@ -31,6 +32,7 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Collection; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.function.LongSupplier; @@ -42,6 +44,7 @@ import java.util.function.LongSupplier; * the current global checkpoint except the index commit which has the highest max sequence number among those. */ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { + private final Logger logger; private final TranslogDeletionPolicy translogDeletionPolicy; private final EngineConfig.OpenMode openMode; private final LongSupplier globalCheckpointSupplier; @@ -50,9 +53,10 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. private volatile IndexCommit lastCommit; // the most recent commit point - CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy, + CombinedDeletionPolicy(EngineConfig.OpenMode openMode, Logger logger, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) { this.openMode = openMode; + this.logger = logger; this.translogDeletionPolicy = translogDeletionPolicy; this.globalCheckpointSupplier = globalCheckpointSupplier; this.startingCommit = startingCommit; @@ -104,8 +108,12 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { * (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit, * the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog. */ - private void keepOnlyStartingCommitOnInit(List commits) { - commits.stream().filter(commit -> startingCommit.equals(commit) == false).forEach(IndexCommit::delete); + private void keepOnlyStartingCommitOnInit(List commits) throws IOException { + for (IndexCommit commit : commits) { + if (startingCommit.equals(commit) == false) { + this.deleteCommit(commit); + } + } assert startingCommit.isDeleted() == false : "Starting commit must not be deleted"; lastCommit = startingCommit; safeCommit = startingCommit; @@ -118,14 +126,22 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { safeCommit = commits.get(keptPosition); for (int i = 0; i < keptPosition; i++) { if (snapshottedCommits.containsKey(commits.get(i)) == false) { - commits.get(i).delete(); + deleteCommit(commits.get(i)); } } updateTranslogDeletionPolicy(); } + private void deleteCommit(IndexCommit commit) throws IOException { + assert commit.isDeleted() == false : "Index commit [" + commitDescription(commit) + "] is deleted twice"; + logger.debug("Delete index commit [{}]", commitDescription(commit)); + commit.delete(); + assert commit.isDeleted() : "Deletion commit [" + commitDescription(commit) + "] was suppressed"; + } + private void updateTranslogDeletionPolicy() throws IOException { assert Thread.holdsLock(this); + logger.debug("Safe commit [{}], last commit [{}]", commitDescription(safeCommit), commitDescription(lastCommit)); assert safeCommit.isDeleted() == false : "The safe commit must not be deleted"; final long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); assert lastCommit.isDeleted() == false : "The last commit must not be deleted"; @@ -229,6 +245,13 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { return false; } + /** + * Returns a description for a given {@link IndexCommit}. This should be only used for logging and debugging. + */ + public static String commitDescription(IndexCommit commit) throws IOException { + return String.format(Locale.ROOT, "CommitPoint{segment[%s], userData[%s]}", commit.getSegmentsFileName(), commit.getUserData()); + } + /** * A wrapper of an index commit that prevents it from being deleted. */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index c98b7763d1d..b01ef12d647 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -184,7 +184,7 @@ public class InternalEngine extends Engine { assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG || startingCommit != null : "Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]"; this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit); - this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy, + this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, logger, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint, startingCommit); writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit); updateMaxUnsafeAutoIdTimestampFromWriter(writer); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 5920e286aa1..9b4131239ab 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.recovery; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.index.DirectoryReader; @@ -64,6 +65,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.List; +import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -320,7 +322,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde final long startingSeqNo; if (metadataSnapshot.size() > 0) { - startingSeqNo = getStartingSeqNo(recoveryTarget); + startingSeqNo = getStartingSeqNo(logger, recoveryTarget); } else { startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; } @@ -354,12 +356,20 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde * @return the starting sequence number or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if obtaining the starting sequence number * failed */ - public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) { + public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget) { try { final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation()); final List existingCommits = DirectoryReader.listCommits(recoveryTarget.store().directory()); final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint); final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(safeCommit); + if (logger.isTraceEnabled()) { + final StringJoiner descriptionOfExistingCommits = new StringJoiner(","); + for (IndexCommit commit : existingCommits) { + descriptionOfExistingCommits.add(CombinedDeletionPolicy.commitDescription(commit)); + } + logger.trace("Calculate starting seqno based on global checkpoint [{}], safe commit [{}], existing commits [{}]", + globalCheckpoint, CombinedDeletionPolicy.commitDescription(safeCommit), descriptionOfExistingCommits); + } if (seqNoStats.maxSeqNo <= globalCheckpoint) { assert seqNoStats.localCheckpoint <= globalCheckpoint; /* diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index d4af7836810..f663504da9f 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -55,7 +55,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { final AtomicLong globalCheckpoint = new AtomicLong(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( - OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null); + OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null); final LongArrayList maxSeqNoList = new LongArrayList(); final LongArrayList translogGenList = new LongArrayList(); @@ -95,7 +95,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( - OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null); + OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null); long lastMaxSeqNo = between(1, 1000); long lastTranslogGen = between(1, 20); int safeIndex = 0; @@ -116,6 +116,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { long upper = safeIndex == commitList.size() - 1 ? lastMaxSeqNo : Long.parseLong(commitList.get(safeIndex + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO)) - 1; globalCheckpoint.set(randomLongBetween(lower, upper)); + commitList.forEach(this::resetDeletion); indexPolicy.onCommit(commitList); // Captures and releases some commits int captures = between(0, 5); @@ -144,6 +145,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { } snapshottingCommits.forEach(indexPolicy::releaseCommit); globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE)); + commitList.forEach(this::resetDeletion); indexPolicy.onCommit(commitList); for (int i = 0; i < commitList.size() - 1; i++) { assertThat(commitList.get(i).isDeleted(), equalTo(true)); @@ -159,7 +161,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( - OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null); + OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null); long legacyTranslogGen = randomNonNegativeLong(); IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen); @@ -180,6 +182,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen)); // Make the fresh commit safe. + resetDeletion(legacyCommit); globalCheckpoint.set(randomLongBetween(maxSeqNo, Long.MAX_VALUE)); indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit)); verify(legacyCommit, times(2)).delete(); @@ -192,7 +195,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( - OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get, null); + OPEN_INDEX_CREATE_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null); final int invalidCommits = between(1, 10); final List commitList = new ArrayList<>(); @@ -230,7 +233,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { } final IndexCommit startingCommit = randomFrom(commitList); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( - OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, startingCommit); + OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, startingCommit); indexPolicy.onInit(commitList); for (IndexCommit commit : commitList) { if (commit.equals(startingCommit) == false) { @@ -249,7 +252,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { final UUID translogUUID = UUID.randomUUID(); final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy( - OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null); + OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null); final List commitList = new ArrayList<>(); int totalCommits = between(2, 20); long lastMaxSeqNo = between(1, 1000); @@ -261,6 +264,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { } IndexCommit safeCommit = randomFrom(commitList); globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO))); + commitList.forEach(this::resetDeletion); indexPolicy.onCommit(commitList); if (safeCommit == commitList.get(commitList.size() - 1)) { // Safe commit is the last commit - no need to clean up @@ -274,6 +278,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { // Advanced enough globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE)); assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(true)); + commitList.forEach(this::resetDeletion); indexPolicy.onCommit(commitList); // Safe commit is the last commit - no need to clean up assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen)); @@ -287,17 +292,21 @@ public class CombinedDeletionPolicyTests extends ESTestCase { userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString()); userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen)); - final AtomicBoolean deleted = new AtomicBoolean(); final IndexCommit commit = mock(IndexCommit.class); final Directory directory = mock(Directory.class); when(commit.getUserData()).thenReturn(userData); when(commit.getDirectory()).thenReturn(directory); + resetDeletion(commit); + return commit; + } + + void resetDeletion(IndexCommit commit) { + final AtomicBoolean deleted = new AtomicBoolean(); when(commit.isDeleted()).thenAnswer(args -> deleted.get()); doAnswer(arg -> { deleted.set(true); return null; }).when(commit).delete(); - return commit; } IndexCommit mockLegacyIndexCommit(UUID translogUUID, long translogGen) throws IOException { @@ -306,6 +315,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen)); final IndexCommit commit = mock(IndexCommit.class); when(commit.getUserData()).thenReturn(userData); + resetDeletion(commit); return commit; } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 31521e33f21..13d5ea189f0 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -33,7 +33,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { { recoveryEmptyReplica(replica); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L)); + assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L)); recoveryTarget.decRef(); } // Last commit is good - use it. @@ -49,7 +49,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { replica.updateGlobalCheckpointOnReplica(initDocs - 1, "test"); replica.getTranslog().sync(); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs)); + assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs)); recoveryTarget.decRef(); } // Global checkpoint does not advance, last commit is not good - use the previous commit @@ -63,7 +63,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { } flushShard(replica); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs)); + assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs)); recoveryTarget.decRef(); } // Advances the global checkpoint, a safe commit also advances @@ -71,7 +71,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { replica.updateGlobalCheckpointOnReplica(initDocs + moreDocs - 1, "test"); replica.getTranslog().sync(); final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null); - assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs + moreDocs)); + assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs + moreDocs)); recoveryTarget.decRef(); } } finally { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index ca9d1728d92..1ce28e16d57 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -462,7 +462,7 @@ public abstract class IndexShardTestCase extends ESTestCase { final Store.MetadataSnapshot snapshot = getMetadataSnapshotOrEmpty(replica); final long startingSeqNo; if (snapshot.size() > 0) { - startingSeqNo = PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget); + startingSeqNo = PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget); } else { startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; }