diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1404b61b8ec..de13eb10977 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -781,10 +781,14 @@ public class InternalEngine extends Engine { // we need to fail the engine. it might have already been failed before // but we are double-checking it's failed and closed if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) { - failEngine("already closed by tragic event", indexWriter.getTragicException()); + failEngine("already closed by tragic event on the index writer", indexWriter.getTragicException()); + } else if (translog.isOpen() == false && translog.getTragicException() != null) { + failEngine("already closed by tragic event on the translog", translog.getTragicException()); } return true; - } else if (t != null && indexWriter.isOpen() == false && indexWriter.getTragicException() == t) { + } else if (t != null && + ((indexWriter.isOpen() == false && indexWriter.getTragicException() == t) + || (translog.isOpen() == false && translog.getTragicException() == t))) { // this spot on - we are handling the tragic event exception here so we have to fail the engine // right away failEngine(source, t); diff --git a/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java index 6026468973a..a2eb0bff646 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java @@ -48,22 +48,27 @@ public final class BufferingTranslogWriter extends TranslogWriter { public Translog.Location add(BytesReference data) throws IOException { try (ReleasableLock lock = writeLock.acquire()) { ensureOpen(); - operationCounter++; final long offset = totalOffset; if (data.length() >= buffer.length) { flush(); // we use the channel to write, since on windows, writing to the RAF might not be reflected // when reading through the channel - data.writeTo(channel); + try { + data.writeTo(channel); + } catch (Throwable ex) { + closeWithTragicEvent(ex); + throw ex; + } writtenOffset += data.length(); totalOffset += data.length(); - return new Translog.Location(generation, offset, data.length()); + } else { + if (data.length() > buffer.length - bufferCount) { + flush(); + } + data.writeTo(bufferOs); + totalOffset += data.length(); } - if (data.length() > buffer.length - bufferCount) { - flush(); - } - data.writeTo(bufferOs); - totalOffset += data.length(); + operationCounter++; return new Translog.Location(generation, offset, data.length()); } } @@ -71,10 +76,17 @@ public final class BufferingTranslogWriter extends TranslogWriter { protected final void flush() throws IOException { assert writeLock.isHeldByCurrentThread(); if (bufferCount > 0) { + ensureOpen(); // we use the channel to write, since on windows, writing to the RAF might not be reflected // when reading through the channel - Channels.writeToChannel(buffer, 0, bufferCount, channel); - writtenOffset += bufferCount; + final int bufferSize = bufferCount; + try { + Channels.writeToChannel(buffer, 0, bufferSize, channel); + } catch (Throwable ex) { + closeWithTragicEvent(ex); + throw ex; + } + writtenOffset += bufferSize; bufferCount = 0; } } @@ -102,20 +114,28 @@ public final class BufferingTranslogWriter extends TranslogWriter { } @Override - public void sync() throws IOException { - if (!syncNeeded()) { - return; - } - synchronized (this) { + public synchronized void sync() throws IOException { + if (syncNeeded()) { + ensureOpen(); // this call gives a better exception that the incRef if we are closed by a tragic event channelReference.incRef(); try { + final long offsetToSync; + final int opsCounter; try (ReleasableLock lock = writeLock.acquire()) { flush(); - lastSyncedOffset = totalOffset; + offsetToSync = totalOffset; + opsCounter = operationCounter; } // we can do this outside of the write lock but we have to protect from // concurrent syncs - checkpoint(lastSyncedOffset, operationCounter, channelReference); + ensureOpen(); // just for kicks - the checkpoint happens or not either way + try { + checkpoint(offsetToSync, opsCounter, channelReference); + } catch (Throwable ex) { + closeWithTragicEvent(ex); + throw ex; + } + lastSyncedOffset = offsetToSync; } finally { channelReference.decRef(); } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 35dd895bc2e..4016695dd89 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -115,7 +115,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final Path location; private TranslogWriter current; private volatile ImmutableTranslogReader currentCommittingTranslog; - private long lastCommittedTranslogFileGeneration = -1; // -1 is safe as it will not cause an translog deletion. + private volatile long lastCommittedTranslogFileGeneration = -1; // -1 is safe as it will not cause an translog deletion. private final AtomicBoolean closed = new AtomicBoolean(); private final TranslogConfig config; private final String translogUUID; @@ -279,7 +279,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } } - boolean isOpen() { + /** Returns {@code true} if this {@code Translog} is still open. */ + public boolean isOpen() { return closed.get() == false; } @@ -288,10 +289,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC if (closed.compareAndSet(false, true)) { try (ReleasableLock lock = writeLock.acquire()) { try { - IOUtils.close(current, currentCommittingTranslog); + current.sync(); } finally { - IOUtils.close(recoveredTranslogs); - recoveredTranslogs.clear(); + try { + IOUtils.close(current, currentCommittingTranslog); + } finally { + IOUtils.close(recoveredTranslogs); + recoveredTranslogs.clear(); + } } } finally { FutureUtils.cancel(syncScheduler); @@ -354,7 +359,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC TranslogWriter createWriter(long fileGeneration) throws IOException { TranslogWriter newFile; try { - newFile = TranslogWriter.create(config.getType(), shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), config.getBufferSize()); + newFile = TranslogWriter.create(config.getType(), shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), config.getBufferSize(), getChannelFactory()); } catch (IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } @@ -393,7 +398,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * @see Index * @see org.elasticsearch.index.translog.Translog.Delete */ - public Location add(Operation operation) throws TranslogException { + public Location add(Operation operation) throws IOException { final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); try { final BufferedChecksumStreamOutput checksumStreamOutput = new BufferedChecksumStreamOutput(out); @@ -415,7 +420,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC assert current.assertBytesAtLocation(location, bytes); return location; } - } catch (AlreadyClosedException ex) { + } catch (AlreadyClosedException | IOException ex) { + if (current.getTragicException() != null) { + try { + close(); + } catch (Exception inner) { + ex.addSuppressed(inner); + } + } throw ex; } catch (Throwable e) { throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); @@ -429,6 +441,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * Snapshots are fixed in time and will not be updated with future operations. */ public Snapshot newSnapshot() { + ensureOpen(); try (ReleasableLock lock = readLock.acquire()) { ArrayList toOpen = new ArrayList<>(); toOpen.addAll(recoveredTranslogs); @@ -493,6 +506,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC if (closed.get() == false) { current.sync(); } + } catch (AlreadyClosedException | IOException ex) { + if (current.getTragicException() != null) { + try { + close(); + } catch (Exception inner) { + ex.addSuppressed(inner); + } + } + throw ex; } } @@ -520,6 +542,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC public boolean ensureSynced(Location location) throws IOException { try (ReleasableLock lock = readLock.acquire()) { if (location.generation == current.generation) { // if we have a new one it's already synced + ensureOpen(); return current.syncUpTo(location.translogLocation + location.size); } } @@ -548,31 +571,29 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final class OnCloseRunnable implements Callback { @Override public void handle(ChannelReference channelReference) { - try (ReleasableLock lock = writeLock.acquire()) { - if (isReferencedGeneration(channelReference.getGeneration()) == false) { - Path translogPath = channelReference.getPath(); - assert channelReference.getPath().getParent().equals(location) : "translog files must be in the location folder: " + location + " but was: " + translogPath; - // if the given translogPath is not the current we can safely delete the file since all references are released - logger.trace("delete translog file - not referenced and not current anymore {}", translogPath); - IOUtils.deleteFilesIgnoringExceptions(translogPath); - IOUtils.deleteFilesIgnoringExceptions(translogPath.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration()))); + if (isReferencedGeneration(channelReference.getGeneration()) == false) { + Path translogPath = channelReference.getPath(); + assert channelReference.getPath().getParent().equals(location) : "translog files must be in the location folder: " + location + " but was: " + translogPath; + // if the given translogPath is not the current we can safely delete the file since all references are released + logger.trace("delete translog file - not referenced and not current anymore {}", translogPath); + IOUtils.deleteFilesIgnoringExceptions(translogPath); + IOUtils.deleteFilesIgnoringExceptions(translogPath.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration()))); - } - try (DirectoryStream stream = Files.newDirectoryStream(location)) { - for (Path path : stream) { - Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(path.getFileName().toString()); - if (matcher.matches()) { - long generation = Long.parseLong(matcher.group(1)); - if (isReferencedGeneration(generation) == false) { - logger.trace("delete translog file - not referenced and not current anymore {}", path); - IOUtils.deleteFilesIgnoringExceptions(path); - IOUtils.deleteFilesIgnoringExceptions(path.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration()))); - } + } + try (DirectoryStream stream = Files.newDirectoryStream(location)) { + for (Path path : stream) { + Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(path.getFileName().toString()); + if (matcher.matches()) { + long generation = Long.parseLong(matcher.group(1)); + if (isReferencedGeneration(generation) == false) { + logger.trace("delete translog file - not referenced and not current anymore {}", path); + IOUtils.deleteFilesIgnoringExceptions(path); + IOUtils.deleteFilesIgnoringExceptions(path.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration()))); } } - } catch (IOException e) { - logger.warn("failed to delete unreferenced translog files", e); } + } catch (IOException e) { + logger.warn("failed to delete unreferenced translog files", e); } } } @@ -1294,6 +1315,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration()); } final TranslogWriter oldCurrent = current; + oldCurrent.ensureOpen(); oldCurrent.sync(); currentCommittingTranslog = current.immutableReader(); Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); @@ -1389,7 +1411,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private void ensureOpen() { if (closed.get()) { - throw new AlreadyClosedException("translog is already closed"); + throw new AlreadyClosedException("translog is already closed", current.getTragicException()); } } @@ -1400,4 +1422,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return outstandingViews.size(); } + TranslogWriter.ChannelFactory getChannelFactory() { + return TranslogWriter.ChannelFactory.DEFAULT; + } + + /** If this {@code Translog} was closed as a side-effect of a tragic exception, + * e.g. disk full while flushing a new segment, this returns the root cause exception. + * Otherwise (no tragic exception has occurred) it returns null. */ + public Throwable getTragicException() { + return current.getTragicException(); + } + } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java index 590bc319057..d7077fd90ad 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java @@ -140,16 +140,16 @@ public abstract class TranslogReader implements Closeable, Comparable onClose, int bufferSize) throws IOException { + public static TranslogWriter create(Type type, ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback onClose, int bufferSize, ChannelFactory channelFactory) throws IOException { final BytesRef ref = new BytesRef(translogUUID); final int headerLength = CodecUtil.headerLength(TRANSLOG_CODEC) + ref.length + RamUsageEstimator.NUM_BYTES_INT; - final FileChannel channel = FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); + final FileChannel channel = channelFactory.open(file); try { // This OutputStreamDataOutput is intentionally not closed because // closing it will close the FileChannel @@ -90,6 +94,12 @@ public class TranslogWriter extends TranslogReader { throw throwable; } } + /** If this {@code TranslogWriter} was closed as a side-effect of a tragic exception, + * e.g. disk full while flushing a new segment, this returns the root cause exception. + * Otherwise (no tragic exception has occurred) it returns null. */ + public Throwable getTragicException() { + return tragedy; + } public enum Type { @@ -118,6 +128,16 @@ public class TranslogWriter extends TranslogReader { } } + protected final void closeWithTragicEvent(Throwable throwable) throws IOException { + try (ReleasableLock lock = writeLock.acquire()) { + if (tragedy == null) { + tragedy = throwable; + } else { + tragedy.addSuppressed(throwable); + } + close(); + } + } /** * add the given bytes to the translog and return the location they were written at @@ -127,9 +147,14 @@ public class TranslogWriter extends TranslogReader { try (ReleasableLock lock = writeLock.acquire()) { ensureOpen(); position = writtenOffset; - data.writeTo(channel); + try { + data.writeTo(channel); + } catch (Throwable e) { + closeWithTragicEvent(e); + throw e; + } writtenOffset = writtenOffset + data.length(); - operationCounter = operationCounter + 1; + operationCounter++;; } return new Translog.Location(generation, position, data.length()); } @@ -143,12 +168,13 @@ public class TranslogWriter extends TranslogReader { /** * write all buffered ops to disk and fsync file */ - public void sync() throws IOException { + public synchronized void sync() throws IOException { // synchronized to ensure only one sync happens a time // check if we really need to sync here... if (syncNeeded()) { try (ReleasableLock lock = writeLock.acquire()) { + ensureOpen(); + checkpoint(writtenOffset, operationCounter, channelReference); lastSyncedOffset = writtenOffset; - checkpoint(lastSyncedOffset, operationCounter, channelReference); } } } @@ -262,15 +288,6 @@ public class TranslogWriter extends TranslogReader { return false; } - @Override - protected final void doClose() throws IOException { - try (ReleasableLock lock = writeLock.acquire()) { - sync(); - } finally { - super.doClose(); - } - } - @Override protected void readBytes(ByteBuffer buffer, long position) throws IOException { try (ReleasableLock lock = readLock.acquire()) { @@ -288,4 +305,20 @@ public class TranslogWriter extends TranslogReader { Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation); Checkpoint.write(checkpointFile, checkpoint, options); } + + static class ChannelFactory { + + static final ChannelFactory DEFAULT = new ChannelFactory(); + + // only for testing until we have a disk-full FileSystemt + public FileChannel open(Path file) throws IOException { + return FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); + } + } + + protected final void ensureOpen() { + if (isClosed()) { + throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragedy); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/BufferedTranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/BufferedTranslogTests.java index aab980e975d..a29cc6cf8d0 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/BufferedTranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/BufferedTranslogTests.java @@ -34,13 +34,12 @@ import java.nio.file.Path; public class BufferedTranslogTests extends TranslogTests { @Override - protected Translog create(Path path) throws IOException { + protected TranslogConfig getTranslogConfig(Path path) { Settings build = Settings.settingsBuilder() .put("index.translog.fs.type", TranslogWriter.Type.BUFFERED.name()) .put("index.translog.fs.buffer_size", 10 + randomInt(128 * 1024), ByteSizeUnit.BYTES) .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) .build(); - TranslogConfig translogConfig = new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null); - return new Translog(translogConfig); + return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null); } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 26faa02a17d..e35c04dcd6b 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -22,9 +22,11 @@ package org.elasticsearch.index.translog; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.Term; +import org.apache.lucene.mockfile.FilterFileChannel; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LineFileDocs; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -110,16 +112,19 @@ public class TranslogTests extends ESTestCase { } } - protected Translog create(Path path) throws IOException { + private Translog create(Path path) throws IOException { + return new Translog(getTranslogConfig(path)); + } + + protected TranslogConfig getTranslogConfig(Path path) { Settings build = Settings.settingsBuilder() .put(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.SIMPLE.name()) .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) .build(); - TranslogConfig translogConfig = new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null); - return new Translog(translogConfig); + return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null); } - protected void addToTranslogAndList(Translog translog, ArrayList list, Translog.Operation op) { + protected void addToTranslogAndList(Translog translog, ArrayList list, Translog.Operation op) throws IOException { list.add(op); translog.add(op); } @@ -330,7 +335,7 @@ public class TranslogTests extends ESTestCase { } } - public void testSnapshot() { + public void testSnapshot() throws IOException { ArrayList ops = new ArrayList<>(); Translog.Snapshot snapshot = translog.newSnapshot(); assertThat(snapshot, SnapshotMatchers.size(0)); @@ -389,7 +394,7 @@ public class TranslogTests extends ESTestCase { Translog.Snapshot snapshot = translog.newSnapshot(); fail("translog is closed"); } catch (AlreadyClosedException ex) { - assertThat(ex.getMessage(), containsString("translog-1.tlog is already closed can't increment")); + assertEquals(ex.getMessage(), "translog is already closed"); } } @@ -634,7 +639,7 @@ public class TranslogTests extends ESTestCase { final String threadId = "writer_" + i; writers[i] = new Thread(new AbstractRunnable() { @Override - public void doRun() throws BrokenBarrierException, InterruptedException { + public void doRun() throws BrokenBarrierException, InterruptedException, IOException { barrier.await(); int counter = 0; while (run.get()) { @@ -1279,4 +1284,122 @@ public class TranslogTests extends ESTestCase { } } } + + public void testFailFlush() throws IOException { + Path tempDir = createTempDir(); + final AtomicBoolean simulateDiskFull = new AtomicBoolean(); + TranslogConfig config = getTranslogConfig(tempDir); + Translog translog = new Translog(config) { + @Override + TranslogWriter.ChannelFactory getChannelFactory() { + final TranslogWriter.ChannelFactory factory = super.getChannelFactory(); + + return new TranslogWriter.ChannelFactory() { + @Override + public FileChannel open(Path file) throws IOException { + FileChannel channel = factory.open(file); + return new FilterFileChannel(channel) { + + @Override + public int write(ByteBuffer src) throws IOException { + if (simulateDiskFull.get()) { + if (src.limit() > 1) { + final int pos = src.position(); + final int limit = src.limit(); + src.limit(limit / 2); + super.write(src); + src.position(pos); + src.limit(limit); + throw new IOException("__FAKE__ no space left on device"); + } + } + return super.write(src); + } + }; + } + }; + } + }; + + List locations = new ArrayList<>(); + int opsSynced = 0; + int opsAdded = 0; + boolean failed = false; + while(failed == false) { + try { + locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); + opsAdded++; + translog.sync(); + opsSynced++; + } catch (IOException ex) { + failed = true; + assertFalse(translog.isOpen()); + assertEquals("__FAKE__ no space left on device", ex.getMessage()); + } + simulateDiskFull.set(randomBoolean()); + } + simulateDiskFull.set(false); + if (randomBoolean()) { + try { + locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); + fail("we are already closed"); + } catch (AlreadyClosedException ex) { + assertNotNull(ex.getCause()); + assertEquals(ex.getCause().getMessage(), "__FAKE__ no space left on device"); + } + + } + Translog.TranslogGeneration translogGeneration = translog.getGeneration(); + try { + translog.newSnapshot(); + fail("already closed"); + } catch (AlreadyClosedException ex) { + // all is well + assertNotNull(ex.getCause()); + assertSame(translog.getTragicException(), ex.getCause()); + } + + try { + translog.commit(); + fail("already closed"); + } catch (AlreadyClosedException ex) { + assertNotNull(ex.getCause()); + assertSame(translog.getTragicException(), ex.getCause()); + } + + assertFalse(translog.isOpen()); + translog.close(); // we are closed + config.setTranslogGeneration(translogGeneration); + try (Translog tlog = new Translog(config)){ + assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration()); + assertFalse(tlog.syncNeeded()); + + try (Translog.Snapshot snapshot = tlog.newSnapshot()) { + assertEquals(opsSynced, snapshot.estimatedTotalOperations()); + for (int i = 0; i < opsSynced; i++) { + assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, locations.get(i).generation); + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals(i, Integer.parseInt(next.getSource().source.toUtf8())); + } + } + } + } + + public void testTranslogOpsCountIsCorrect() throws IOException { + List locations = new ArrayList<>(); + int numOps = randomIntBetween(100, 200); + LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly + for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { + locations.add(translog.add(new Translog.Index("test", "" + opsAdded, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))))); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertEquals(opsAdded+1, snapshot.estimatedTotalOperations()); + for (int i = 0; i < opsAdded; i++) { + assertEquals("expected operation" + i + " to be in the current translog but wasn't", translog.currentFileGeneration(), locations.get(i).generation); + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + } + } + } + } }