diff --git a/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java index 6026468973a..2a68d34282d 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/BufferingTranslogWriter.java @@ -48,13 +48,17 @@ public final class BufferingTranslogWriter extends TranslogWriter { public Translog.Location add(BytesReference data) throws IOException { try (ReleasableLock lock = writeLock.acquire()) { ensureOpen(); - operationCounter++; final long offset = totalOffset; if (data.length() >= buffer.length) { flush(); // we use the channel to write, since on windows, writing to the RAF might not be reflected // when reading through the channel - data.writeTo(channel); + try { + data.writeTo(channel); + } catch (Throwable ex) { + closeWithTragicEvent(ex); + throw ex; + } writtenOffset += data.length(); totalOffset += data.length(); return new Translog.Location(generation, offset, data.length()); @@ -64,6 +68,7 @@ public final class BufferingTranslogWriter extends TranslogWriter { } data.writeTo(bufferOs); totalOffset += data.length(); + operationCounter++; return new Translog.Location(generation, offset, data.length()); } } @@ -71,10 +76,17 @@ public final class BufferingTranslogWriter extends TranslogWriter { protected final void flush() throws IOException { assert writeLock.isHeldByCurrentThread(); if (bufferCount > 0) { + ensureOpen(); // we use the channel to write, since on windows, writing to the RAF might not be reflected // when reading through the channel - Channels.writeToChannel(buffer, 0, bufferCount, channel); - writtenOffset += bufferCount; + final int bufferSize = bufferCount; + try { + Channels.writeToChannel(buffer, 0, bufferSize, channel); + } catch (Throwable ex) { + closeWithTragicEvent(ex); + throw ex; + } + writtenOffset += bufferSize; bufferCount = 0; } } @@ -97,7 +109,7 @@ public final class BufferingTranslogWriter extends TranslogWriter { } @Override - public boolean syncNeeded() { + public synchronized boolean syncNeeded() { return totalOffset != lastSyncedOffset; } @@ -107,15 +119,24 @@ public final class BufferingTranslogWriter extends TranslogWriter { return; } synchronized (this) { + ensureOpen(); channelReference.incRef(); try { + final long offsetToSync; try (ReleasableLock lock = writeLock.acquire()) { flush(); - lastSyncedOffset = totalOffset; + offsetToSync = totalOffset; } // we can do this outside of the write lock but we have to protect from // concurrent syncs - checkpoint(lastSyncedOffset, operationCounter, channelReference); + try { + ensureOpen(); + checkpoint(offsetToSync, operationCounter, channelReference); + } catch (IOException ex) { + closeWithTragicEvent(ex); + throw ex; + } + lastSyncedOffset = offsetToSync; } finally { channelReference.decRef(); } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 35dd895bc2e..394d7c75ef8 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -115,7 +115,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final Path location; private TranslogWriter current; private volatile ImmutableTranslogReader currentCommittingTranslog; - private long lastCommittedTranslogFileGeneration = -1; // -1 is safe as it will not cause an translog deletion. + private volatile long lastCommittedTranslogFileGeneration = -1; // -1 is safe as it will not cause an translog deletion. private final AtomicBoolean closed = new AtomicBoolean(); private final TranslogConfig config; private final String translogUUID; @@ -288,10 +288,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC if (closed.compareAndSet(false, true)) { try (ReleasableLock lock = writeLock.acquire()) { try { - IOUtils.close(current, currentCommittingTranslog); + current.sync(); } finally { - IOUtils.close(recoveredTranslogs); - recoveredTranslogs.clear(); + try { + IOUtils.close(current, currentCommittingTranslog); + } finally { + IOUtils.close(recoveredTranslogs); + recoveredTranslogs.clear(); + } } } finally { FutureUtils.cancel(syncScheduler); @@ -354,7 +358,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC TranslogWriter createWriter(long fileGeneration) throws IOException { TranslogWriter newFile; try { - newFile = TranslogWriter.create(config.getType(), shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), config.getBufferSize()); + newFile = TranslogWriter.create(config.getType(), shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), config.getBufferSize(), getChannelFactory()); } catch (IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } @@ -548,31 +552,29 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC private final class OnCloseRunnable implements Callback { @Override public void handle(ChannelReference channelReference) { - try (ReleasableLock lock = writeLock.acquire()) { - if (isReferencedGeneration(channelReference.getGeneration()) == false) { - Path translogPath = channelReference.getPath(); - assert channelReference.getPath().getParent().equals(location) : "translog files must be in the location folder: " + location + " but was: " + translogPath; - // if the given translogPath is not the current we can safely delete the file since all references are released - logger.trace("delete translog file - not referenced and not current anymore {}", translogPath); - IOUtils.deleteFilesIgnoringExceptions(translogPath); - IOUtils.deleteFilesIgnoringExceptions(translogPath.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration()))); + if (isReferencedGeneration(channelReference.getGeneration()) == false) { + Path translogPath = channelReference.getPath(); + assert channelReference.getPath().getParent().equals(location) : "translog files must be in the location folder: " + location + " but was: " + translogPath; + // if the given translogPath is not the current we can safely delete the file since all references are released + logger.trace("delete translog file - not referenced and not current anymore {}", translogPath); + IOUtils.deleteFilesIgnoringExceptions(translogPath); + IOUtils.deleteFilesIgnoringExceptions(translogPath.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration()))); - } - try (DirectoryStream stream = Files.newDirectoryStream(location)) { - for (Path path : stream) { - Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(path.getFileName().toString()); - if (matcher.matches()) { - long generation = Long.parseLong(matcher.group(1)); - if (isReferencedGeneration(generation) == false) { - logger.trace("delete translog file - not referenced and not current anymore {}", path); - IOUtils.deleteFilesIgnoringExceptions(path); - IOUtils.deleteFilesIgnoringExceptions(path.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration()))); - } + } + try (DirectoryStream stream = Files.newDirectoryStream(location)) { + for (Path path : stream) { + Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(path.getFileName().toString()); + if (matcher.matches()) { + long generation = Long.parseLong(matcher.group(1)); + if (isReferencedGeneration(generation) == false) { + logger.trace("delete translog file - not referenced and not current anymore {}", path); + IOUtils.deleteFilesIgnoringExceptions(path); + IOUtils.deleteFilesIgnoringExceptions(path.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration()))); } } - } catch (IOException e) { - logger.warn("failed to delete unreferenced translog files", e); } + } catch (IOException e) { + logger.warn("failed to delete unreferenced translog files", e); } } } @@ -1400,4 +1402,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return outstandingViews.size(); } + TranslogWriter.ChannelFactory getChannelFactory() { + return TranslogWriter.ChannelFactory.DEFAULT; + } + } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java index 590bc319057..d7077fd90ad 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogReader.java @@ -140,16 +140,16 @@ public abstract class TranslogReader implements Closeable, Comparable onClose, int bufferSize) throws IOException { + public static TranslogWriter create(Type type, ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback onClose, int bufferSize, ChannelFactory channelFactory) throws IOException { final BytesRef ref = new BytesRef(translogUUID); final int headerLength = CodecUtil.headerLength(TRANSLOG_CODEC) + ref.length + RamUsageEstimator.NUM_BYTES_INT; - final FileChannel channel = FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); + final FileChannel channel = channelFactory.open(file); try { // This OutputStreamDataOutput is intentionally not closed because // closing it will close the FileChannel @@ -118,6 +121,18 @@ public class TranslogWriter extends TranslogReader { } } + protected final void closeWithTragicEvent(Throwable throwable) throws IOException { + try (ReleasableLock lock = writeLock.acquire()) { + if (throwable != null) { + if (tragicEvent == null) { + tragicEvent = throwable; + } else { + tragicEvent.addSuppressed(throwable); + } + } + close(); + } + } /** * add the given bytes to the translog and return the location they were written at @@ -127,9 +142,14 @@ public class TranslogWriter extends TranslogReader { try (ReleasableLock lock = writeLock.acquire()) { ensureOpen(); position = writtenOffset; - data.writeTo(channel); + try { + data.writeTo(channel); + } catch (Throwable e) { + closeWithTragicEvent(e); + throw e; + } writtenOffset = writtenOffset + data.length(); - operationCounter = operationCounter + 1; + operationCounter++;; } return new Translog.Location(generation, position, data.length()); } @@ -147,6 +167,7 @@ public class TranslogWriter extends TranslogReader { // check if we really need to sync here... if (syncNeeded()) { try (ReleasableLock lock = writeLock.acquire()) { + ensureOpen(); lastSyncedOffset = writtenOffset; checkpoint(lastSyncedOffset, operationCounter, channelReference); } @@ -262,15 +283,6 @@ public class TranslogWriter extends TranslogReader { return false; } - @Override - protected final void doClose() throws IOException { - try (ReleasableLock lock = writeLock.acquire()) { - sync(); - } finally { - super.doClose(); - } - } - @Override protected void readBytes(ByteBuffer buffer, long position) throws IOException { try (ReleasableLock lock = readLock.acquire()) { @@ -288,4 +300,20 @@ public class TranslogWriter extends TranslogReader { Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation); Checkpoint.write(checkpointFile, checkpoint, options); } + + static class ChannelFactory { + + static final ChannelFactory DEFAULT = new ChannelFactory(); + + // only for testing until we have a disk-full FileSystemt + public FileChannel open(Path file) throws IOException { + return FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); + } + } + + protected final void ensureOpen() { + if (isClosed()) { + throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragicEvent); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/BufferedTranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/BufferedTranslogTests.java index aab980e975d..a29cc6cf8d0 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/BufferedTranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/BufferedTranslogTests.java @@ -34,13 +34,12 @@ import java.nio.file.Path; public class BufferedTranslogTests extends TranslogTests { @Override - protected Translog create(Path path) throws IOException { + protected TranslogConfig getTranslogConfig(Path path) { Settings build = Settings.settingsBuilder() .put("index.translog.fs.type", TranslogWriter.Type.BUFFERED.name()) .put("index.translog.fs.buffer_size", 10 + randomInt(128 * 1024), ByteSizeUnit.BYTES) .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) .build(); - TranslogConfig translogConfig = new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null); - return new Translog(translogConfig); + return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null); } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 26faa02a17d..d90c3257add 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.index.translog; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.Term; +import org.apache.lucene.mockfile.FilterFileChannel; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.util.IOUtils; @@ -110,13 +111,16 @@ public class TranslogTests extends ESTestCase { } } - protected Translog create(Path path) throws IOException { + private Translog create(Path path) throws IOException { + return new Translog(getTranslogConfig(path)); + } + + protected TranslogConfig getTranslogConfig(Path path) { Settings build = Settings.settingsBuilder() .put(TranslogConfig.INDEX_TRANSLOG_FS_TYPE, TranslogWriter.Type.SIMPLE.name()) .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) .build(); - TranslogConfig translogConfig = new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null); - return new Translog(translogConfig); + return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null); } protected void addToTranslogAndList(Translog translog, ArrayList list, Translog.Operation op) { @@ -1279,4 +1283,108 @@ public class TranslogTests extends ESTestCase { } } } + + public void testFailFlush() throws IOException { + Path tempDir = createTempDir(); + final AtomicBoolean failWrite = new AtomicBoolean(); + final AtomicBoolean simulateDiskFull = new AtomicBoolean(); + TranslogConfig config = getTranslogConfig(tempDir); + Translog translog = new Translog(config) { + @Override + TranslogWriter.ChannelFactory getChannelFactory() { + final TranslogWriter.ChannelFactory factory = super.getChannelFactory(); + + return new TranslogWriter.ChannelFactory() { + @Override + public FileChannel open(Path file) throws IOException { + FileChannel channel = factory.open(file); + return new FilterFileChannel(channel) { + + @Override + public int write(ByteBuffer src) throws IOException { + if (failWrite.get()) { + throw new IOException("boom"); + } + if (simulateDiskFull.get()) { + if (src.limit() > 1) { + final int pos = src.position(); + final int limit = src.limit(); + src.limit(limit / 2); + super.write(src); + src.position(pos); + src.limit(limit); + throw new IOException("no space left on device"); + } + } + return super.write(src); + } + }; + } + }; + } + }; + + List locations = new ArrayList<>(); + int opsSynced = 0; + int opsAdded = 0; + boolean failed = false; + boolean syncFailed = true; + while(failed == false) { + try { + locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); + opsAdded++; + translog.sync(); + opsSynced++; + } catch (IOException ex) { + failed = true; + assertEquals("no space left on device", ex.getMessage()); + } catch (Exception ex) { + failed = true; + assertTrue(ex.toString(), ex.getMessage().startsWith("Failed to write operation")); + } + simulateDiskFull.set(randomBoolean()); + } + simulateDiskFull.set(false); + if (randomBoolean()) { + try { + locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); + opsSynced++; + } catch (AlreadyClosedException ex) { + assertNotNull(ex.getCause()); + assertEquals(ex.getCause().getMessage(), "no space left on device"); + } + + } + Translog.TranslogGeneration translogGeneration = translog.getGeneration(); + try { + translog.newSnapshot(); + fail("already closed"); + } catch (AlreadyClosedException ex) { + // all is well + } + + try { + translog.close(); + if (opsAdded != opsSynced) { + fail("already closed"); + } + } catch (AlreadyClosedException ex) { + assertNotNull(ex.getCause()); + } + config.setTranslogGeneration(translogGeneration); + try (Translog tlog = new Translog(config)){ + assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration()); + assertFalse(tlog.syncNeeded()); + + try (Translog.Snapshot snapshot = tlog.newSnapshot()) { + assertEquals(opsSynced, snapshot.estimatedTotalOperations()); + for (int i = 0; i < opsSynced; i++) { + assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, locations.get(i).generation); + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals(i, Integer.parseInt(next.getSource().source.toUtf8())); + } + } + } + } }