From 087f182481a381a368704714227d949017f986a1 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 8 Jun 2017 09:21:28 +0200 Subject: [PATCH] Translog file recovery should not rely on lucene commits (#25005) When we open a translog, we rely on the `translog.ckp` file to tell us what the maximum generation file should be and on the information stored in the last lucene commit to know the first file we need to recover. This requires coordination and is currently subject to a race condition: if a node dies after a lucene commit is made but before we remove the translog generations that were unneeded by it, the next time we open the translog we will ignore those files and never delete them (I have added tests for this). This PR changes the approach to have the translog store both of those numbers in the `translog.ckp`. This means it's more self contained and easier to control. This change also decouples the translog recovery logic from the specific commit we're opening. This prepares the ground to fully utilize the deletion policy introduced in #24950 and store more translog data that's needed for Lucene, keep multiple lucene commits around and be free to recover from any of them. --- .../index/engine/CombinedDeletionPolicy.java | 4 + .../index/engine/InternalEngine.java | 7 +- .../index/translog/Checkpoint.java | 22 ++- .../index/translog/Translog.java | 131 +++++++++++++---- .../index/translog/TranslogWriter.java | 27 +++- .../translog/TruncateTranslogCommand.java | 4 +- .../index/engine/InternalEngineTests.java | 123 ++++++++++------ .../index/translog/TranslogTests.java | 133 ++++++++++++++++-- .../index/translog/TranslogVersionTests.java | 2 +- .../PeerRecoveryTargetServiceTests.java | 2 +- 10 files changed, 356 insertions(+), 99 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 68e2865e284..69173cc4216 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -83,4 +83,8 @@ class CombinedDeletionPolicy extends IndexDeletionPolicy { public SnapshotDeletionPolicy getIndexDeletionPolicy() { return indexDeletionPolicy; } + + public TranslogDeletionPolicy getTranslogDeletionPolicy() { + return translogDeletionPolicy; + } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8c0481d686f..f84f76b537e 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -305,7 +305,8 @@ public class InternalEngine extends Engine { Translog.TranslogGeneration translogGeneration = translog.getGeneration(); final int opsRecovered; try { - Translog.Snapshot snapshot = translog.newSnapshot(); + final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); + Translog.Snapshot snapshot = translog.newSnapshot(translogGen); opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot); } catch (Exception e) { throw new EngineException(shardId, "failed to recover from translog", e); @@ -321,6 +322,8 @@ public class InternalEngine extends Engine { } else if (translog.isCurrent(translogGeneration) == false) { commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); } + // clean up what's not needed + translog.trimUnreferencedReaders(); } private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) throws IOException { @@ -1772,7 +1775,7 @@ public class InternalEngine extends Engine { * @param syncId the sync flush ID ({@code null} if not committing a synced flush) * @throws IOException if an I/O exception occurs committing the specfied writer */ - private void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException { + protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException { ensureCanFlush(); try { final long localCheckpoint = seqNoService().getLocalCheckpoint(); diff --git a/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java b/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java index ce5cc8e7601..547d5aa499f 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java @@ -44,6 +44,7 @@ final class Checkpoint { final long minSeqNo; final long maxSeqNo; final long globalCheckpoint; + final long minTranslogGeneration; private static final int INITIAL_VERSION = 1; // start with 1, just to recognize there was some magic serialization logic before private static final int CURRENT_VERSION = 2; // introduction of global checkpoints @@ -58,6 +59,7 @@ final class Checkpoint { + Long.BYTES // minimum sequence number, introduced in 6.0.0 + Long.BYTES // maximum sequence number, introduced in 6.0.0 + Long.BYTES // global checkpoint, introduced in 6.0.0 + + Long.BYTES // minimum translog generation in the translog - introduced in 6.0.0 + CodecUtil.footerLength(); // size of 5.0.0 checkpoint @@ -76,15 +78,19 @@ final class Checkpoint { * @param minSeqNo the current minimum sequence number of all operations in the translog * @param maxSeqNo the current maximum sequence number of all operations in the translog * @param globalCheckpoint the last-known global checkpoint + * @param minTranslogGeneration the minimum generation referenced by the translog at this moment. */ - Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint) { - assert minSeqNo <= maxSeqNo; + Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint, long minTranslogGeneration) { + assert minSeqNo <= maxSeqNo : "minSeqNo [" + minSeqNo + "] is higher than maxSeqNo [" + maxSeqNo + "]"; + assert minTranslogGeneration <= generation : + "minTranslogGen [" + minTranslogGeneration + "] is higher than generation [" + generation + "]"; this.offset = offset; this.numOps = numOps; this.generation = generation; this.minSeqNo = minSeqNo; this.maxSeqNo = maxSeqNo; this.globalCheckpoint = globalCheckpoint; + this.minTranslogGeneration = minTranslogGeneration; } private void write(DataOutput out) throws IOException { @@ -94,16 +100,18 @@ final class Checkpoint { out.writeLong(minSeqNo); out.writeLong(maxSeqNo); out.writeLong(globalCheckpoint); + out.writeLong(minTranslogGeneration); } - static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint) { + static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint, + long minTranslogGeneration) { final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; - return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint); + return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration); } static Checkpoint readCheckpointV6_0_0(final DataInput in) throws IOException { - return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong(), in.readLong(), in.readLong()); + return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong(), in.readLong(), in.readLong(), in.readLong()); } // reads a checksummed checkpoint introduced in ES 5.0.0 @@ -111,7 +119,8 @@ final class Checkpoint { final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; final long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; - return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), minSeqNo, maxSeqNo, globalCheckpoint); + final long minTranslogGeneration = -1L; + return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration); } @Override @@ -123,6 +132,7 @@ final class Checkpoint { ", minSeqNo=" + minSeqNo + ", maxSeqNo=" + maxSeqNo + ", globalCheckpoint=" + globalCheckpoint + + ", minTranslogGeneration=" + minTranslogGeneration + '}'; } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index c351f034623..032d26d890a 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.index.Term; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; @@ -55,6 +56,7 @@ import java.nio.file.StandardCopyOption; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -63,7 +65,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.LongSupplier; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -170,11 +171,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC && Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning logger.warn("deleted previously created, but not yet committed, next generation [{}]. This can happen due to a tragic exception when creating a new generation", nextTranslogFile.getFileName()); } - this.readers.addAll(recoverFromFiles(deletionPolicy.getMinTranslogGenerationForRecovery(), checkpoint)); + this.readers.addAll(recoverFromFiles(checkpoint)); if (readers.isEmpty()) { throw new IllegalStateException("at least one reader must be recovered"); } boolean success = false; + current = null; try { current = createWriter(checkpoint.generation + 1); success = true; @@ -192,14 +194,13 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC final long generation = deletionPolicy.getMinTranslogGenerationForRecovery(); logger.debug("wipe translog location - creating new translog, starting generation [{}]", generation); Files.createDirectories(location); - final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, globalCheckpointSupplier.getAsLong()); + final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, globalCheckpointSupplier.getAsLong(), generation); final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME); Checkpoint.write(getChannelFactory(), checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); IOUtils.fsync(checkpointFile, false); - current = createWriter(generation); - + current = createWriter(generation, generation); + readers.clear(); } - // now that we know which files are there, create a new current one. } catch (Exception e) { // close the opened translog files if we fail to create a new translog... IOUtils.closeWhileHandlingException(current); @@ -209,29 +210,46 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } /** recover all translog files found on disk */ - private ArrayList recoverFromFiles(long translogFileGeneration, Checkpoint checkpoint) throws IOException { + private ArrayList recoverFromFiles(Checkpoint checkpoint) throws IOException { boolean success = false; ArrayList foundTranslogs = new ArrayList<>(); final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, TRANSLOG_FILE_SUFFIX); // a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work boolean tempFileRenamed = false; try (ReleasableLock lock = writeLock.acquire()) { logger.debug("open uncommitted translog checkpoint {}", checkpoint); + + final long minGenerationToRecoverFrom; + if (checkpoint.minTranslogGeneration < 0) { + final Version indexVersionCreated = indexSettings().getIndexVersionCreated(); + assert indexVersionCreated.before(Version.V_6_0_0_alpha3) : + "no minTranslogGeneration in checkpoint, but index was created with version [" + indexVersionCreated + "]"; + minGenerationToRecoverFrom = deletionPolicy.getMinTranslogGenerationForRecovery(); + } else { + minGenerationToRecoverFrom = checkpoint.minTranslogGeneration; + } + final String checkpointTranslogFile = getFilename(checkpoint.generation); // we open files in reverse order in order to validate tranlsog uuid before we start traversing the translog based on // the generation id we found in the lucene commit. This gives for better error messages if the wrong // translog was found. foundTranslogs.add(openReader(location.resolve(checkpointTranslogFile), checkpoint)); - for (long i = checkpoint.generation - 1; i >= translogFileGeneration; i--) { + for (long i = checkpoint.generation - 1; i >= minGenerationToRecoverFrom; i--) { Path committedTranslogFile = location.resolve(getFilename(i)); if (Files.exists(committedTranslogFile) == false) { throw new IllegalStateException("translog file doesn't exist with generation: " + i + " recovering from: " + - translogFileGeneration + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive"); + minGenerationToRecoverFrom + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive"); } final TranslogReader reader = openReader(committedTranslogFile, Checkpoint.read(location.resolve(getCommitCheckpointFileName(i)))); foundTranslogs.add(reader); logger.debug("recovered local translog from checkpoint {}", checkpoint); } Collections.reverse(foundTranslogs); + + // when we clean up files, we first update the checkpoint with a new minReferencedTranslog and then delete them; + // if we crash just at the wrong moment, it may be that we leave one unreferenced file behind so we delete it if there + IOUtils.deleteFilesIgnoringExceptions(location.resolve(getFilename(minGenerationToRecoverFrom - 1)), + location.resolve(getCommitCheckpointFileName(minGenerationToRecoverFrom - 1))); + Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(checkpoint.generation)); if (Files.exists(commitCheckpoint)) { Checkpoint checkpointFromDisk = Checkpoint.read(commitCheckpoint); @@ -332,6 +350,20 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } } + /** + * Returns the minimum file generation referenced by the translog + */ + long getMinFileGeneration() { + try (ReleasableLock ignored = readLock.acquire()) { + if (readers.isEmpty()) { + return current.getGeneration(); + } else { + return readers.get(0).getGeneration(); + } + } + } + + /** * Returns the number of operations in the transaction files that aren't committed to lucene.. */ @@ -372,7 +404,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } } - /** * Creates a new translog for the specified generation. * @@ -381,6 +412,18 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * @throws IOException if creating the translog failed */ TranslogWriter createWriter(long fileGeneration) throws IOException { + return createWriter(fileGeneration, getMinFileGeneration()); + } + + /** + * creates a new writer + * + * @param fileGeneration the generation of the write to be written + * @param initialMinTranslogGen the minimum translog generation to be written in the first checkpoint. This is + * needed to solve and initialization problem while constructing an empty translog. + * With no readers and no current, a call to {@link #getMinFileGeneration()} would not work. + */ + private TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen) throws IOException { final TranslogWriter newFile; try { newFile = TranslogWriter.create( @@ -390,7 +433,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC location.resolve(getFilename(fileGeneration)), getChannelFactory(), config.getBufferSize(), - globalCheckpointSupplier); + globalCheckpointSupplier, + initialMinTranslogGen, + this::getMinFileGeneration); } catch (final IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } @@ -494,12 +539,18 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * Snapshots are fixed in time and will not be updated with future operations. */ public Snapshot newSnapshot() { - return createSnapshot(Long.MIN_VALUE); + try (ReleasableLock ignored = readLock.acquire()) { + return newSnapshot(getMinFileGeneration()); + } } - private Snapshot createSnapshot(long minGeneration) { + public Snapshot newSnapshot(long minGeneration) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); + if (minGeneration < getMinFileGeneration()) { + throw new IllegalArgumentException("requested snapshot generation [" + minGeneration + "] is not available. " + + "Min referenced generation is [" + getMinFileGeneration() + "]"); + } Snapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current)) .filter(reader -> reader.getGeneration() >= minGeneration) .map(BaseTranslogReader::newSnapshot).toArray(Snapshot[]::new); @@ -673,7 +724,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC /** create a snapshot from this view */ public Snapshot snapshot() { ensureOpen(); - return Translog.this.createSnapshot(minGeneration); + return Translog.this.newSnapshot(minGeneration); } void ensureOpen() { @@ -1442,30 +1493,58 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * Trims unreferenced translog generations by asking {@link TranslogDeletionPolicy} for the minimum * required generation */ - public void trimUnreferencedReaders() { + public void trimUnreferencedReaders() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { if (closed.get()) { // we're shutdown potentially on some tragic event, don't delete anything return; } long minReferencedGen = deletionPolicy.minTranslogGenRequired(); - final long minExistingGen = readers.isEmpty() ? current.getGeneration() : readers.get(0).getGeneration(); - assert minReferencedGen >= minExistingGen : + assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" - + minExistingGen + "]"; - final List unreferenced = - readers.stream().filter(r -> r.getGeneration() < minReferencedGen).collect(Collectors.toList()); - for (final TranslogReader unreferencedReader : unreferenced) { - final Path translogPath = unreferencedReader.path(); + + getMinFileGeneration() + "]"; + assert minReferencedGen <= currentFileGeneration() : + "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] which is higher than the current generation [" + + currentFileGeneration() + "]"; + + + for (Iterator iterator = readers.iterator(); iterator.hasNext(); ) { + TranslogReader reader = iterator.next(); + if (reader.getGeneration() >= minReferencedGen) { + break; + } + iterator.remove(); + IOUtils.closeWhileHandlingException(reader); + final Path translogPath = reader.path(); logger.trace("delete translog file [{}], not referenced and not current anymore", translogPath); - IOUtils.closeWhileHandlingException(unreferencedReader); - IOUtils.deleteFilesIgnoringExceptions(translogPath, - translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration()))); + // The checkpoint is used when opening the translog to know which files should be recovered from. + // We now update the checkpoint to ignore the file we are going to remove. + // Note that there is a provision in recoverFromFiles to allow for the case where we synced the checkpoint + // but crashed before we could delete the file. + current.sync(); + deleteReaderFiles(reader); } - readers.removeAll(unreferenced); + assert readers.isEmpty() == false || current.generation == minReferencedGen : + "all readers were cleaned but the minReferenceGen [" + minReferencedGen + "] is not the current writer's gen [" + + current.generation + "]"; + } catch (Exception ex) { + try { + closeOnTragicEvent(ex); + } catch (final Exception inner) { + ex.addSuppressed(inner); + } + throw ex; } } + /** + * deletes all files associated with a reader. package-private to be able to simulate node failures at this point + */ + void deleteReaderFiles(TranslogReader reader) { + IOUtils.deleteFilesIgnoringExceptions(reader.path(), + reader.path().resolveSibling(getCommitCheckpointFileName(reader.getGeneration()))); + } + void closeFilesIfNoPendingViews() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { if (closed.get() && deletionPolicy.pendingViewsCount() == 0) { diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 4a98365e02f..d637c9da79f 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -71,6 +71,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private volatile long maxSeqNo; private final LongSupplier globalCheckpointSupplier; + private final LongSupplier minTranslogGenerationSupplier; protected final AtomicBoolean closed = new AtomicBoolean(false); // lock order synchronized(syncLock) -> synchronized(this) @@ -85,10 +86,11 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { final FileChannel channel, final Path path, final ByteSizeValue bufferSize, - final LongSupplier globalCheckpointSupplier) throws IOException { + final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier) throws IOException { super(initialCheckpoint.generation, channel, path, channel.position()); this.shardId = shardId; this.channelFactory = channelFactory; + this.minTranslogGenerationSupplier = minTranslogGenerationSupplier; this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt()); this.lastSyncedCheckpoint = initialCheckpoint; this.totalOffset = initialCheckpoint.offset; @@ -121,7 +123,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize, - final LongSupplier globalCheckpointSupplier) throws IOException { + final LongSupplier globalCheckpointSupplier, + final long initialMinTranslogGen, + final LongSupplier minTranslogGenerationSupplier) throws IOException { final BytesRef ref = new BytesRef(translogUUID); final int headerLength = getHeaderLength(ref.length); final FileChannel channel = channelFactory.open(file); @@ -132,9 +136,11 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { writeHeader(out, ref); channel.force(true); final Checkpoint checkpoint = - Checkpoint.emptyTranslogCheckpoint(headerLength, fileGeneration, globalCheckpointSupplier.getAsLong()); + Checkpoint.emptyTranslogCheckpoint(headerLength, fileGeneration, globalCheckpointSupplier.getAsLong(), + initialMinTranslogGen); writeCheckpoint(channelFactory, file.getParent(), checkpoint); - return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier); + return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier, + minTranslogGenerationSupplier); } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition @@ -242,7 +248,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { * checkpoint has not yet been fsynced */ public boolean syncNeeded() { - return totalOffset != lastSyncedCheckpoint.offset || globalCheckpointSupplier.getAsLong() != lastSyncedCheckpoint.globalCheckpoint; + return totalOffset != lastSyncedCheckpoint.offset || + globalCheckpointSupplier.getAsLong() != lastSyncedCheckpoint.globalCheckpoint || + minTranslogGenerationSupplier.getAsLong() != lastSyncedCheckpoint.minTranslogGeneration; } @Override @@ -330,6 +338,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { final long currentMinSeqNo; final long currentMaxSeqNo; final long currentGlobalCheckpoint; + final long currentMinTranslogGeneration; synchronized (this) { ensureOpen(); try { @@ -339,6 +348,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { currentMinSeqNo = minSeqNo; currentMaxSeqNo = maxSeqNo; currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong(); + currentMinTranslogGeneration = minTranslogGenerationSupplier.getAsLong(); } catch (Exception ex) { try { closeWithTragicEvent(ex); @@ -354,7 +364,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { try { channel.force(false); checkpoint = - writeCheckpoint(channelFactory, offsetToSync, opsCounter, currentMinSeqNo, currentMaxSeqNo, currentGlobalCheckpoint, path.getParent(), generation); + writeCheckpoint(channelFactory, offsetToSync, opsCounter, currentMinSeqNo, currentMaxSeqNo, + currentGlobalCheckpoint, currentMinTranslogGeneration, path.getParent(), generation); } catch (Exception ex) { try { closeWithTragicEvent(ex); @@ -398,9 +409,11 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { long minSeqNo, long maxSeqNo, long globalCheckpoint, + long minTranslogGeneration, Path translogFile, long generation) throws IOException { - final Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation, minSeqNo, maxSeqNo, globalCheckpoint); + final Checkpoint checkpoint = + new Checkpoint(syncPosition, numOperations, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration); writeCheckpoint(channelFactory, translogFile, checkpoint); return checkpoint; } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java index ea1f4c13dfd..408691692ca 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java @@ -168,8 +168,8 @@ public class TruncateTranslogCommand extends EnvironmentAwareCommand { /** Write a checkpoint file to the given location with the given generation */ public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException { - Checkpoint emptyCheckpoint = - Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration, SequenceNumbersService.UNASSIGNED_SEQ_NO); + Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration, + SequenceNumbersService.UNASSIGNED_SEQ_NO, translogGeneration); Checkpoint.write(FileChannel::open, filename, emptyCheckpoint, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); // fsync with metadata here to make sure. diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index bb9ec29f1ad..31a99063fb6 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -224,12 +224,12 @@ public class InternalEngineTests extends ESTestCase { codecName = "default"; } defaultSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder() - .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us - .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), codecName) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.getKey(), - between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY))) - .build()); // TODO randomize more settings + .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us + .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), codecName) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.getKey(), + between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY))) + .build()); // TODO randomize more settings threadPool = new TestThreadPool(getClass().getName()); store = createStore(); storeReplica = createStore(); @@ -272,14 +272,14 @@ public class InternalEngineTests extends ESTestCase { public void tearDown() throws Exception { super.tearDown(); IOUtils.close( - replicaEngine, storeReplica, - engine, store); + replicaEngine, storeReplica, + engine, store); terminate(threadPool); } private static Document testDocumentWithTextField() { - return testDocumentWithTextField("test"); + return testDocumentWithTextField("test"); } private static Document testDocumentWithTextField(String value) { @@ -319,6 +319,7 @@ public class InternalEngineTests extends ESTestCase { protected Store createStore(final Directory directory) throws IOException { return createStore(INDEX_SETTINGS, directory); } + protected Store createStore(final IndexSettings indexSettings, final Directory directory) throws IOException { final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { @Override @@ -351,6 +352,7 @@ public class InternalEngineTests extends ESTestCase { return createEngine(indexSettings, store, translogPath, mergePolicy, null); } + protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, @Nullable IndexWriterFactory indexWriterFactory) throws IOException { return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, null); @@ -392,12 +394,12 @@ public class InternalEngineTests extends ESTestCase { @Nullable final Function sequenceNumbersServiceSupplier, final EngineConfig config) { return new InternalEngine(config) { - @Override - IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { - return (indexWriterFactory != null) ? - indexWriterFactory.createWriter(directory, iwc) : - super.createWriter(directory, iwc); - } + @Override + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { + return (indexWriterFactory != null) ? + indexWriterFactory.createWriter(directory, iwc) : + super.createWriter(directory, iwc); + } @Override public SequenceNumbersService seqNoService() { @@ -436,9 +438,9 @@ public class InternalEngineTests extends ESTestCase { final List refreshListenerList = refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, - mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, - IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler); + mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, + IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, + TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler); return config; } @@ -454,7 +456,7 @@ public class InternalEngineTests extends ESTestCase { public void testSegments() throws Exception { try (Store store = createStore(); - Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { + Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { List segments = engine.segments(false); assertThat(segments.isEmpty(), equalTo(true)); assertThat(engine.segmentsStats(false).getCount(), equalTo(0L)); @@ -603,7 +605,7 @@ public class InternalEngineTests extends ESTestCase { public void testSegmentsWithMergeFlag() throws Exception { try (Store store = createStore(); - Engine engine = createEngine(defaultSettings, store, createTempDir(), new TieredMergePolicy())) { + Engine engine = createEngine(defaultSettings, store, createTempDir(), new TieredMergePolicy())) { ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); engine.index(index); @@ -686,7 +688,7 @@ public class InternalEngineTests extends ESTestCase { public void testSegmentsStatsIncludingFileSizes() throws Exception { try (Store store = createStore(); - Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { + Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { assertThat(engine.segmentsStats(true).getFileSizes().size(), equalTo(0)); ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); @@ -1162,7 +1164,7 @@ public class InternalEngineTests extends ESTestCase { public void testSyncedFlush() throws IOException { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) { + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); engine.index(indexForDoc(doc)); @@ -1172,13 +1174,13 @@ public class InternalEngineTests extends ESTestCase { wrongBytes[0] = (byte) ~wrongBytes[0]; Engine.CommitId wrongId = new Engine.CommitId(wrongBytes); assertEquals("should fail to sync flush with wrong id (but no docs)", engine.syncFlush(syncId + "1", wrongId), - Engine.SyncedFlushResult.COMMIT_MISMATCH); + Engine.SyncedFlushResult.COMMIT_MISMATCH); engine.index(indexForDoc(doc)); assertEquals("should fail to sync flush with right id but pending doc", engine.syncFlush(syncId + "2", commitID), - Engine.SyncedFlushResult.PENDING_OPERATIONS); + Engine.SyncedFlushResult.PENDING_OPERATIONS); commitID = engine.flush(); assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID), - Engine.SyncedFlushResult.SUCCESS); + Engine.SyncedFlushResult.SUCCESS); assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); } @@ -1189,7 +1191,7 @@ public class InternalEngineTests extends ESTestCase { for (int i = 0; i < iters; i++) { try (Store store = createStore(); InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogDocMergePolicy(), null))) { + new LogDocMergePolicy(), null))) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); Engine.Index doc1 = indexForDoc(testParsedDocument("1", null, testDocumentWithTextField(), B_1, null)); engine.index(doc1); @@ -1208,7 +1210,7 @@ public class InternalEngineTests extends ESTestCase { } Engine.CommitId commitID = engine.flush(); assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID), - Engine.SyncedFlushResult.SUCCESS); + Engine.SyncedFlushResult.SUCCESS); assertEquals(3, engine.segments(false).size()); engine.forceMerge(forceMergeFlushes, 1, false, false, false); @@ -1248,7 +1250,7 @@ public class InternalEngineTests extends ESTestCase { engine.index(indexForDoc(doc)); final Engine.CommitId commitID = engine.flush(); assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID), - Engine.SyncedFlushResult.SUCCESS); + Engine.SyncedFlushResult.SUCCESS); assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); EngineConfig config = engine.config(); @@ -1271,7 +1273,7 @@ public class InternalEngineTests extends ESTestCase { engine.index(indexForDoc(doc)); final Engine.CommitId commitID = engine.flush(); assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID), - Engine.SyncedFlushResult.SUCCESS); + Engine.SyncedFlushResult.SUCCESS); assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); doc = testParsedDocument("2", null, testDocumentWithTextField(), new BytesArray("{}"), null); @@ -1307,8 +1309,8 @@ public class InternalEngineTests extends ESTestCase { public void testForceMerge() throws IOException { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), + new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP int numDocs = randomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null); @@ -1422,7 +1424,7 @@ public class InternalEngineTests extends ESTestCase { final Term id = newUid("1"); final int startWithSeqNo; if (partialOldPrimary) { - startWithSeqNo = randomBoolean() ? numOfOps - 1 : randomIntBetween(0, numOfOps - 1); + startWithSeqNo = randomBoolean() ? numOfOps - 1 : randomIntBetween(0, numOfOps - 1); } else { startWithSeqNo = 0; } @@ -1541,7 +1543,8 @@ public class InternalEngineTests extends ESTestCase { } if (randomBoolean()) { engine.refresh("test"); - } if (randomBoolean()) { + } + if (randomBoolean()) { engine.flush(); } firstOp = false; @@ -1598,9 +1601,9 @@ public class InternalEngineTests extends ESTestCase { try { final Engine.Operation op = ops.get(docOffset); if (op instanceof Engine.Index) { - engine.index((Engine.Index)op); + engine.index((Engine.Index) op); } else { - engine.delete((Engine.Delete)op); + engine.delete((Engine.Delete) op); } if ((docOffset + 1) % 4 == 0) { engine.refresh("test"); @@ -1641,7 +1644,7 @@ public class InternalEngineTests extends ESTestCase { final long correctVersion = docDeleted && randomBoolean() ? Versions.MATCH_DELETED : lastOpVersion; logger.info("performing [{}]{}{}", op.operationType().name().charAt(0), - versionConflict ? " (conflict " + conflictingVersion +")" : "", + versionConflict ? " (conflict " + conflictingVersion + ")" : "", versionedOp ? " (versioned " + correctVersion + ")" : ""); if (op instanceof Engine.Index) { final Engine.Index index = (Engine.Index) op; @@ -1811,7 +1814,7 @@ public class InternalEngineTests extends ESTestCase { assertOpsOnReplica(replicaOps, replicaEngine, true); final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine); final long currentSeqNo = getSequenceID(replicaEngine, - new Engine.Get(false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1(); + new Engine.Get(false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1(); try (Searcher searcher = engine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); searcher.searcher().search(new MatchAllDocsQuery(), collector); @@ -2169,11 +2172,11 @@ public class InternalEngineTests extends ESTestCase { final IndexCommit commit = commitRef.getIndexCommit(); Map userData = commit.getUserData(); long localCheckpoint = userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ? - Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) : - SequenceNumbersService.NO_OPS_PERFORMED; + Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) : + SequenceNumbersService.NO_OPS_PERFORMED; long maxSeqNo = userData.containsKey(SequenceNumbers.MAX_SEQ_NO) ? - Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)) : - SequenceNumbersService.UNASSIGNED_SEQ_NO; + Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)) : + SequenceNumbersService.UNASSIGNED_SEQ_NO; // local checkpoint and max seq no shouldn't go backwards assertThat(localCheckpoint, greaterThanOrEqualTo(prevLocalCheckpoint)); assertThat(maxSeqNo, greaterThanOrEqualTo(prevMaxSeqNo)); @@ -2192,7 +2195,7 @@ public class InternalEngineTests extends ESTestCase { FixedBitSet seqNosBitSet = getSeqNosSet(reader, highestSeqNo); for (int i = 0; i <= localCheckpoint; i++) { assertTrue("local checkpoint [" + localCheckpoint + "], _seq_no [" + i + "] should be indexed", - seqNosBitSet.get(i)); + seqNosBitSet.get(i)); } } prevLocalCheckpoint = localCheckpoint; @@ -2268,7 +2271,7 @@ public class InternalEngineTests extends ESTestCase { public void testEnableGcDeletes() throws Exception { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { engine.config().setEnableGcDeletes(false); final Function searcherFactory = engine::acquireSearcher; @@ -2341,7 +2344,7 @@ public class InternalEngineTests extends ESTestCase { private Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo, boolean isRetry) { - return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, + return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); } @@ -2484,6 +2487,38 @@ public class InternalEngineTests extends ESTestCase { } } + public void testTranslogCleanUpPostCommitCrash() throws Exception { + try (Store store = createStore()) { + AtomicBoolean throwErrorOnCommit = new AtomicBoolean(); + final Path translogPath = createTempDir(); + try (InternalEngine engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null)) { + @Override + protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { + super.commitIndexWriter(writer, translog, syncId); + if (throwErrorOnCommit.get()) { + throw new RuntimeException("power's out"); + } + } + }) { + final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); + engine.index(indexForDoc(doc1)); + throwErrorOnCommit.set(true); + FlushFailedEngineException e = expectThrows(FlushFailedEngineException.class, engine::flush); + assertThat(e.getCause().getMessage(), equalTo("power's out")); + } + try (InternalEngine engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null))) { + engine.recoverFromTranslog(); + assertVisibleCount(engine, 1); + final long committedGen = Long.valueOf( + engine.getLastCommittedSegmentInfos().getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); + for (int gen = 1; gen < committedGen; gen++) { + final Path genFile = translogPath.resolve(Translog.getFilename(gen)); + assertFalse(genFile + " wasn't cleaned up", Files.exists(genFile)); + } + } + } + } + public void testSkipTranslogReplay() throws IOException { final int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index d52adf37d6e..b911e9a5a48 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -104,12 +104,14 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.LongStream; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.lessThanOrEqualTo; @LuceneTestCase.SuppressFileSystems("ExtrasFS") public class TranslogTests extends ESTestCase { @@ -141,7 +143,7 @@ public class TranslogTests extends ESTestCase { return new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } - private void markCurrentGenAsCommitted(Translog translog) { + private void markCurrentGenAsCommitted(Translog translog) throws IOException { commit(translog, translog.currentFileGeneration()); } @@ -150,9 +152,14 @@ public class TranslogTests extends ESTestCase { commit(translog, translog.currentFileGeneration()); } - private void commit(Translog translog, long genToCommit) { - translog.getDeletionPolicy().setMinTranslogGenerationForRecovery(genToCommit); + private void commit(Translog translog, long genToCommit) throws IOException { + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); + deletionPolicy.setMinTranslogGenerationForRecovery(genToCommit); translog.trimUnreferencedReaders(); + if (deletionPolicy.pendingViewsCount() == 0) { + assertThat(deletionPolicy.minTranslogGenRequired(), equalTo(genToCommit)); + } + assertThat(translog.getMinFileGeneration(), equalTo(deletionPolicy.minTranslogGenRequired())); } @Override @@ -484,7 +491,7 @@ public class TranslogTests extends ESTestCase { } public void assertFileIsPresent(Translog translog, long id) { - if (Files.exists(translogDir.resolve(Translog.getFilename(id)))) { + if (Files.exists(translog.location().resolve(Translog.getFilename(id)))) { return; } fail(Translog.getFilename(id) + " is not present in any location: " + translog.location()); @@ -494,6 +501,15 @@ public class TranslogTests extends ESTestCase { assertFalse("translog [" + id + "] still exists", Files.exists(translog.location().resolve(Translog.getFilename(id)))); } + private void assertFilePresences(Translog translog) { + for (long gen = translog.getMinFileGeneration(); gen < translog.currentFileGeneration(); gen++) { + assertFileIsPresent(translog, gen); + } + for (long gen = 1; gen < translog.getMinFileGeneration(); gen++) { + assertFileDeleted(translog, gen); + } + } + static class LocationOperation implements Comparable { final Translog.Operation operation; final Translog.Location location; @@ -1015,7 +1031,7 @@ public class TranslogTests extends ESTestCase { } public void testTranslogWriter() throws IOException { - final TranslogWriter writer = translog.createWriter(0); + final TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1); final int numOps = randomIntBetween(8, 128); byte[] bytes = new byte[4]; ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); @@ -1075,7 +1091,7 @@ public class TranslogTests extends ESTestCase { } public void testCloseIntoReader() throws IOException { - try (TranslogWriter writer = translog.createWriter(0)) { + try (TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1)) { final int numOps = randomIntBetween(8, 128); final byte[] bytes = new byte[4]; final ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); @@ -1270,7 +1286,7 @@ public class TranslogTests extends ESTestCase { TranslogConfig config = translog.getConfig(); Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); Checkpoint read = Checkpoint.read(ckp); - Checkpoint corrupted = Checkpoint.emptyTranslogCheckpoint(0, 0, SequenceNumbersService.UNASSIGNED_SEQ_NO); + Checkpoint corrupted = Checkpoint.emptyTranslogCheckpoint(0, 0, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0); Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); final String translogUUID = translog.getTranslogUUID(); final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); @@ -1278,8 +1294,8 @@ public class TranslogTests extends ESTestCase { fail("corrupted"); } catch (IllegalStateException ex) { assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, " + - "numOps=55, generation=2, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, " + - "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2}", ex.getMessage()); + "numOps=55, generation=2, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-2, minTranslogGeneration=1} but got: Checkpoint{offset=0, numOps=0, " + + "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2, minTranslogGeneration=0}", ex.getMessage()); } Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { @@ -1699,6 +1715,91 @@ public class TranslogTests extends ESTestCase { } } + /** + * Tests the situation where the node crashes after a translog gen was committed to lucene, but before the translog had the chance + * to clean up its files. + */ + public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { + int translogOperations = randomIntBetween(10, 100); + for (int op = 0; op < translogOperations / 2; op++) { + translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))); + if (rarely()) { + translog.rollGeneration(); + } + } + translog.rollGeneration(); + long comittedGeneration = randomLongBetween(2, translog.currentFileGeneration()); + for (int op = translogOperations / 2; op < translogOperations; op++) { + translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))); + if (rarely()) { + translog.rollGeneration(); + } + } + // engine blows up, after committing the above generation + translog.close(); + TranslogConfig config = translog.getConfig(); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); + translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + assertThat(translog.getMinFileGeneration(), equalTo(1L)); + // no trimming done yet, just recovered + for (long gen = 1; gen < translog.currentFileGeneration(); gen++) { + assertFileIsPresent(translog, gen); + } + translog.trimUnreferencedReaders(); + for (long gen = 1; gen < comittedGeneration; gen++) { + assertFileDeleted(translog, gen); + } + } + + /** + * Tests the situation where the node crashes after a translog gen was committed to lucene, but before the translog had the chance + * to clean up its files. + */ + public void testRecoveryFromFailureOnTrimming() throws IOException { + Path tempDir = createTempDir(); + final FailSwitch fail = new FailSwitch(); + fail.failNever(); + final TranslogConfig config = getTranslogConfig(tempDir); + final long comittedGeneration; + final String translogUUID; + try (Translog translog = getFailableTranslog(fail, config)) { + translogUUID = translog.getTranslogUUID(); + int translogOperations = randomIntBetween(10, 100); + for (int op = 0; op < translogOperations / 2; op++) { + translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))); + if (rarely()) { + translog.rollGeneration(); + } + } + translog.rollGeneration(); + comittedGeneration = randomLongBetween(2, translog.currentFileGeneration()); + for (int op = translogOperations / 2; op < translogOperations; op++) { + translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))); + if (rarely()) { + translog.rollGeneration(); + } + } + fail.failRandomly(); + try { + commit(translog, comittedGeneration); + } catch (Exception e) { + // expected... + } + } + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); + try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + // we don't know when things broke exactly + assertThat(translog.getMinFileGeneration(), greaterThanOrEqualTo(1L)); + assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(comittedGeneration)); + assertFilePresences(translog); + translog.trimUnreferencedReaders(); + assertThat(translog.getMinFileGeneration(), equalTo(comittedGeneration)); + assertFilePresences(translog); + } + } + private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException { return getFailableTranslog(fail, config, randomBoolean(), false, null, new TranslogDeletionPolicy()); } @@ -1756,6 +1857,16 @@ public class TranslogTests extends ESTestCase { } }; } + + @Override + void deleteReaderFiles(TranslogReader reader) { + if (fail.fail()) { + // simulate going OOM and dieing just at the wrong moment. + throw new RuntimeException("simulated"); + } else { + super.deleteReaderFiles(reader); + } + } }; } @@ -2054,7 +2165,9 @@ public class TranslogTests extends ESTestCase { minSeqNo = b; maxSeqNo = a; } - return new Checkpoint(randomLong(), randomInt(), randomLong(), minSeqNo, maxSeqNo, randomNonNegativeLong()); + final long generation = randomNonNegativeLong(); + return new Checkpoint(randomLong(), randomInt(), generation, minSeqNo, maxSeqNo, randomNonNegativeLong(), + randomLongBetween(1, generation)); } public void testCheckpointOnDiskFull() throws IOException { diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java index d0087495061..f6aafe765f5 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java @@ -89,7 +89,7 @@ public class TranslogVersionTests extends ESTestCase { final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; final Checkpoint checkpoint = - new Checkpoint(Files.size(path), 1, id, minSeqNo, maxSeqNo, SequenceNumbersService.UNASSIGNED_SEQ_NO); + new Checkpoint(Files.size(path), 1, id, minSeqNo, maxSeqNo, SequenceNumbersService.UNASSIGNED_SEQ_NO, id); return TranslogReader.open(channel, path, checkpoint, null); } } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 4f0fec4c85e..a2e67858584 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -116,7 +116,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { generation, resolve, FileChannel::open, - TranslogConfig.DEFAULT_BUFFER_SIZE, () -> globalCheckpoint)) {} + TranslogConfig.DEFAULT_BUFFER_SIZE, () -> globalCheckpoint, generation, () -> generation)) {} return tempDir; }