diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index a105f652728..fd5c64f96ac 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -424,6 +424,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC closeOnTragicEvent(ex); throw ex; } catch (Throwable e) { + closeOnTragicEvent(e); throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); } finally { Releasables.close(out.bytes()); @@ -500,7 +501,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC if (closed.get() == false) { current.sync(); } - } catch (AlreadyClosedException | IOException ex) { + } catch (Throwable ex) { closeOnTragicEvent(ex); throw ex; } @@ -533,7 +534,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC ensureOpen(); return current.syncUpTo(location.translogLocation + location.size); } - } catch (AlreadyClosedException | IOException ex) { + } catch (Throwable ex) { closeOnTragicEvent(ex); throw ex; } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 51de00f74a6..8b3294c15b8 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -63,7 +63,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Predicate; import static org.hamcrest.Matchers.*; @@ -1387,6 +1386,35 @@ public class TranslogTests extends ESTestCase { } } + public void testTragicEventCanBeAnyException() throws IOException { + Path tempDir = createTempDir(); + final AtomicBoolean fail = new AtomicBoolean(); + TranslogConfig config = getTranslogConfig(tempDir); + assumeFalse("this won't work if we sync on any op",config.isSyncOnEachOperation()); + Translog translog = getFailableTranslog(fail, config, false, true); + LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly + translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); + fail.set(true); + try { + Translog.Location location = translog.add(new Translog.Index("test", "2", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); + if (config.getType() == TranslogWriter.Type.BUFFERED) { // the buffered case will fail on the add if we exceed the buffer or will fail on the flush once we sync + if (randomBoolean()) { + translog.ensureSynced(location); + } else { + translog.sync(); + } + } + //TODO once we have a mock FS that can simulate we can also fail on plain sync + fail("WTF"); + } catch (UnknownException ex) { + // w00t + } catch (TranslogException ex) { + assertTrue(ex.getCause() instanceof UnknownException); + } + assertFalse(translog.isOpen()); + assertTrue(translog.getTragicException() instanceof UnknownException); + } + public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, InterruptedException { Path tempDir = createTempDir(); final AtomicBoolean fail = new AtomicBoolean(false); @@ -1432,9 +1460,9 @@ public class TranslogTests extends ESTestCase { } boolean atLeastOneFailed = false; for (Throwable ex : threadExceptions) { + assertTrue(ex.toString(), ex instanceof IOException || ex instanceof AlreadyClosedException); if (ex != null) { atLeastOneFailed = true; - break; } } if (atLeastOneFailed == false) { @@ -1477,8 +1505,11 @@ public class TranslogTests extends ESTestCase { } } } - private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config) throws IOException { + return getFailableTranslog(fail, config, randomBoolean(), false); + } + + private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException) throws IOException { return new Translog(config) { @Override TranslogWriter.ChannelFactory getChannelFactory() { @@ -1488,7 +1519,7 @@ public class TranslogTests extends ESTestCase { @Override public FileChannel open(Path file) throws IOException { FileChannel channel = factory.open(file); - return new ThrowingFileChannel(fail, randomBoolean(), channel); + return new ThrowingFileChannel(fail, paritalWrites, throwUnknownException, channel); } }; } @@ -1498,11 +1529,13 @@ public class TranslogTests extends ESTestCase { public static class ThrowingFileChannel extends FilterFileChannel { private final AtomicBoolean fail; private final boolean partialWrite; + private final boolean throwUnknownException; - public ThrowingFileChannel(AtomicBoolean fail, boolean partialWrite, FileChannel delegate) { + public ThrowingFileChannel(AtomicBoolean fail, boolean partialWrite, boolean throwUnknownException, FileChannel delegate) { super(delegate); this.fail = fail; this.partialWrite = partialWrite; + this.throwUnknownException = throwUnknownException; } @Override @@ -1519,19 +1552,27 @@ public class TranslogTests extends ESTestCase { public int write(ByteBuffer src) throws IOException { if (fail.get()) { if (partialWrite) { - if (src.limit() > 1) { + if (src.hasRemaining()) { final int pos = src.position(); final int limit = src.limit(); - src.limit(limit / 2); + src.limit(randomIntBetween(pos, limit)); super.write(src); - src.position(pos); src.limit(limit); + src.position(pos); throw new IOException("__FAKE__ no space left on device"); } } - throw new MockDirectoryWrapper.FakeIOException(); + if (throwUnknownException) { + throw new UnknownException(); + } else { + throw new MockDirectoryWrapper.FakeIOException(); + } } return super.write(src); } } + + private static final class UnknownException extends RuntimeException { + + } }