diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 4016695dd89..a105f652728 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -158,7 +158,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC try { if (translogGeneration != null) { - final Checkpoint checkpoint = Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)); + final Checkpoint checkpoint = readCheckpoint(); this.recoveredTranslogs = recoverFromFiles(translogGeneration, checkpoint); if (recoveredTranslogs.isEmpty()) { throw new IllegalStateException("at least one reader must be recovered"); @@ -421,13 +421,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return location; } } catch (AlreadyClosedException | IOException ex) { - if (current.getTragicException() != null) { - try { - close(); - } catch (Exception inner) { - ex.addSuppressed(inner); - } - } + closeOnTragicEvent(ex); throw ex; } catch (Throwable e) { throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); @@ -507,13 +501,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC current.sync(); } } catch (AlreadyClosedException | IOException ex) { - if (current.getTragicException() != null) { - try { - close(); - } catch (Exception inner) { - ex.addSuppressed(inner); - } - } + closeOnTragicEvent(ex); throw ex; } } @@ -545,10 +533,23 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC ensureOpen(); return current.syncUpTo(location.translogLocation + location.size); } + } catch (AlreadyClosedException | IOException ex) { + closeOnTragicEvent(ex); + throw ex; } return false; } + private void closeOnTragicEvent(Throwable ex) { + if (current.getTragicException() != null) { + try { + close(); + } catch (Exception inner) { + ex.addSuppressed(inner); + } + } + } + /** * return stats */ @@ -1433,4 +1434,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return current.getTragicException(); } + /** Reads and returns the current checkpoint */ + final Checkpoint readCheckpoint() throws IOException { + return Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)); + } + } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index e35c04dcd6b..51de00f74a6 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -25,6 +25,7 @@ import org.apache.lucene.index.Term; import org.apache.lucene.mockfile.FilterFileChannel; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.ByteArrayDataOutput; +import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.LineFileDocs; import org.apache.lucene.util.LuceneTestCase; @@ -62,6 +63,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import static org.hamcrest.Matchers.*; @@ -1242,11 +1244,11 @@ public class TranslogTests extends ESTestCase { private final CountDownLatch downLatch; private final int opsPerThread; private final int threadId; - private final BlockingQueue writtenOperations; + private final Collection writtenOperations; private final Throwable[] threadExceptions; private final Translog translog; - public TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId, BlockingQueue writtenOperations, Throwable[] threadExceptions) { + public TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId, Collection writtenOperations, Throwable[] threadExceptions) { this.translog = translog; this.downLatch = downLatch; this.opsPerThread = opsPerThread; @@ -1276,76 +1278,58 @@ public class TranslogTests extends ESTestCase { throw new ElasticsearchException("not supported op type"); } - Translog.Location loc = translog.add(op); + Translog.Location loc = add(op); writtenOperations.add(new LocationOperation(op, loc)); + afterAdd(); } } catch (Throwable t) { threadExceptions[threadId] = t; } } + + protected Translog.Location add(Translog.Operation op) throws IOException { + return translog.add(op); + } + + protected void afterAdd() throws IOException {} } public void testFailFlush() throws IOException { Path tempDir = createTempDir(); - final AtomicBoolean simulateDiskFull = new AtomicBoolean(); + final AtomicBoolean fail = new AtomicBoolean(); TranslogConfig config = getTranslogConfig(tempDir); - Translog translog = new Translog(config) { - @Override - TranslogWriter.ChannelFactory getChannelFactory() { - final TranslogWriter.ChannelFactory factory = super.getChannelFactory(); - - return new TranslogWriter.ChannelFactory() { - @Override - public FileChannel open(Path file) throws IOException { - FileChannel channel = factory.open(file); - return new FilterFileChannel(channel) { - - @Override - public int write(ByteBuffer src) throws IOException { - if (simulateDiskFull.get()) { - if (src.limit() > 1) { - final int pos = src.position(); - final int limit = src.limit(); - src.limit(limit / 2); - super.write(src); - src.position(pos); - src.limit(limit); - throw new IOException("__FAKE__ no space left on device"); - } - } - return super.write(src); - } - }; - } - }; - } - }; + Translog translog = getFailableTranslog(fail, config); List locations = new ArrayList<>(); int opsSynced = 0; - int opsAdded = 0; boolean failed = false; while(failed == false) { try { locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); - opsAdded++; translog.sync(); opsSynced++; + } catch (MockDirectoryWrapper.FakeIOException ex) { + failed = true; + assertFalse(translog.isOpen()); } catch (IOException ex) { failed = true; assertFalse(translog.isOpen()); assertEquals("__FAKE__ no space left on device", ex.getMessage()); } - simulateDiskFull.set(randomBoolean()); + fail.set(randomBoolean()); } - simulateDiskFull.set(false); + fail.set(false); if (randomBoolean()) { try { locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); fail("we are already closed"); } catch (AlreadyClosedException ex) { assertNotNull(ex.getCause()); - assertEquals(ex.getCause().getMessage(), "__FAKE__ no space left on device"); + if (ex.getCause() instanceof MockDirectoryWrapper.FakeIOException) { + assertNull(ex.getCause().getMessage()); + } else { + assertEquals(ex.getCause().getMessage(), "__FAKE__ no space left on device"); + } } } @@ -1402,4 +1386,152 @@ public class TranslogTests extends ESTestCase { } } } + + public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, InterruptedException { + Path tempDir = createTempDir(); + final AtomicBoolean fail = new AtomicBoolean(false); + + TranslogConfig config = getTranslogConfig(tempDir); + Translog translog = getFailableTranslog(fail, config); + + final int threadCount = randomIntBetween(1, 5); + Thread[] threads = new Thread[threadCount]; + final Throwable[] threadExceptions = new Throwable[threadCount]; + final CountDownLatch downLatch = new CountDownLatch(1); + final CountDownLatch added = new CountDownLatch(randomIntBetween(10, 100)); + List writtenOperations = Collections.synchronizedList(new ArrayList<>()); + for (int i = 0; i < threadCount; i++) { + final int threadId = i; + threads[i] = new TranslogThread(translog, downLatch, 200, threadId, writtenOperations, threadExceptions) { + @Override + protected Translog.Location add(Translog.Operation op) throws IOException { + Translog.Location add = super.add(op); + added.countDown(); + return add; + } + + @Override + protected void afterAdd() throws IOException { + if (randomBoolean()) { + translog.sync(); + } + } + }; + threads[i].setDaemon(true); + threads[i].start(); + } + downLatch.countDown(); + added.await(); + try (Translog.View view = translog.newView()) { + // this holds a reference to the current tlog channel such that it's not closed + // if we hit a tragic event. this is important to ensure that asserts inside the Translog#add doesn't trip + // otherwise our assertions here are off by one sometimes. + fail.set(true); + for (int i = 0; i < threadCount; i++) { + threads[i].join(); + } + boolean atLeastOneFailed = false; + for (Throwable ex : threadExceptions) { + if (ex != null) { + atLeastOneFailed = true; + break; + } + } + if (atLeastOneFailed == false) { + try { + boolean syncNeeded = translog.syncNeeded(); + translog.close(); + assertFalse("should have failed if sync was needed", syncNeeded); + } catch (IOException ex) { + // boom now we failed + } + } + Collections.sort(writtenOperations, (a, b) -> a.location.compareTo(b.location)); + assertFalse(translog.isOpen()); + final Checkpoint checkpoint = Checkpoint.read(config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME)); + Iterator iterator = writtenOperations.iterator(); + while (iterator.hasNext()) { + LocationOperation next = iterator.next(); + if (checkpoint.offset < (next.location.translogLocation + next.location.size)) { + // drop all that haven't been synced + iterator.remove(); + } + } + config.setTranslogGeneration(translog.getGeneration()); + try (Translog tlog = new Translog(config)) { + try (Translog.Snapshot snapshot = tlog.newSnapshot()) { + if (writtenOperations.size() != snapshot.estimatedTotalOperations()) { + for (int i = 0; i < threadCount; i++) { + if (threadExceptions[i] != null) + threadExceptions[i].printStackTrace(); + } + } + assertEquals(writtenOperations.size(), snapshot.estimatedTotalOperations()); + for (int i = 0; i < writtenOperations.size(); i++) { + assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, writtenOperations.get(i).location.generation); + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals(next, writtenOperations.get(i).operation); + } + } + } + } + } + + private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config) throws IOException { + return new Translog(config) { + @Override + TranslogWriter.ChannelFactory getChannelFactory() { + final TranslogWriter.ChannelFactory factory = super.getChannelFactory(); + + return new TranslogWriter.ChannelFactory() { + @Override + public FileChannel open(Path file) throws IOException { + FileChannel channel = factory.open(file); + return new ThrowingFileChannel(fail, randomBoolean(), channel); + } + }; + } + }; + } + + public static class ThrowingFileChannel extends FilterFileChannel { + private final AtomicBoolean fail; + private final boolean partialWrite; + + public ThrowingFileChannel(AtomicBoolean fail, boolean partialWrite, FileChannel delegate) { + super(delegate); + this.fail = fail; + this.partialWrite = partialWrite; + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int write(ByteBuffer src, long position) throws IOException { + throw new UnsupportedOperationException(); + } + + + public int write(ByteBuffer src) throws IOException { + if (fail.get()) { + if (partialWrite) { + if (src.limit() > 1) { + final int pos = src.position(); + final int limit = src.limit(); + src.limit(limit / 2); + super.write(src); + src.position(pos); + src.limit(limit); + throw new IOException("__FAKE__ no space left on device"); + } + } + throw new MockDirectoryWrapper.FakeIOException(); + } + return super.write(src); + } + } }