From 62333c9e0ae77854761cf019a65f54ccedc7af6b Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Sun, 28 Feb 2016 10:08:28 -0500 Subject: [PATCH] NIFI-1574: Ensure that we never flush a BufferedOutputStream's buffer on close of the write-ahead log --- .../org/wali/MinimalLockingWriteAheadLog.java | 58 +++--- .../wali/TestMinimalLockingWriteAheadLog.java | 178 ++++++++++++++++++ 2 files changed, 208 insertions(+), 28 deletions(-) diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java index 501c3304d7..f20f9174cb 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java @@ -689,17 +689,43 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor } public void close() { - final DataOutputStream out = dataOut; + // Note that here we are closing fileOut and NOT dataOut. + // This is very much intentional, not an oversight. This is done because of + // the way that the OutputStreams are structured. dataOut wraps a BufferedOutputStream, + // which then wraps the FileOutputStream. If we close 'dataOut', then this will call + // the flush() method of BufferedOutputStream. Under normal conditions, this is fine. + // However, there is a very important corner case to consider: + // + // If we are writing to the DataOutputStream in the update() method and that + // call to write() then results in the BufferedOutputStream calling flushBuffer() - + // or if we finish the call to update() and call flush() ourselves - it is possible + // that the internal buffer of the BufferedOutputStream can get partially written to + // to the FileOutputStream and then an IOException occurs. If this occurs, we have + // written a partial record to disk. This still is okay, as we have logic to handle + // the condition where we have a partial record and then an unexpected End-of-File. + // But if we then call close() on 'dataOut', this will call the flush() method of the + // underlying BufferedOutputStream. As a result, we will end up again writing the internal + // buffer of the BufferedOutputStream to the underlying file. At this point, we are left + // not with an unexpected/premature End-of-File but instead a bunch of seemingly random + // bytes that happened to be residing in that internal buffer, and this will result in + // a corrupt and unrecoverable Write-Ahead Log. + // + // Additionally, we are okay not ever calling close on the wrapping BufferedOutputStream and + // DataOutputStream because they don't actually hold any resources that need to be reclaimed, + // and after each update to the Write-Ahead Log, we call flush() ourselves to ensure that we don't + // leave arbitrary data in the BufferedOutputStream that hasn't been flushed to the underlying + // FileOutputStream. + final OutputStream out = fileOut; if (out != null) { try { out.close(); } catch (final Exception e) { - } } this.closed = true; this.dataOut = null; + this.fileOut = null; } public void blackList() { @@ -721,32 +747,8 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor public void rollover() throws IOException { lock.lock(); try { - // Note that here we are closing fileOut and NOT dataOut. - // This is very much intentional, not an oversight. This is done because of - // the way that the OutputStreams are structured. dataOut wraps a BufferedOutputStream, - // which then wraps the FileOutputStream. If we close 'dataOut', then this will call - // the flush() method of BufferedOutputStream. Under normal conditions, this is fine. - // However, there is a very important corner case to consider: - // - // If we are writing to the DataOutputStream in the update() method and that - // call to write() then results in the BufferedOutputStream calling flushBuffer() - - // or if we finish the call to update() and call flush() ourselves - it is possible - // that the internal buffer of the BufferedOutputStream can get partially written to - // to the FileOutputStream and then an IOException occurs. If this occurs, we have - // written a partial record to disk. This still is okay, as we have logic to handle - // the condition where we have a partial record and then an unexpected End-of-File. - // But if we then call close() on 'dataOut', this will call the flush() method of the - // underlying BufferedOutputStream. As a result, we will end up again writing the internal - // buffer of the BufferedOutputStream to the underlying file. At this point, we are left - // not with an unexpected/premature End-of-File but instead a bunch of seemingly random - // bytes that happened to be residing in that internal buffer, and this will result in - // a corrupt and unrecoverable Write-Ahead Log. - // - // Additionally, we are okay not ever calling close on the wrapping BufferedOutputStream and - // DataOutputStream because they don't actually hold any resources that need to be reclaimed, - // and after each update to the Write-Ahead Log, we call flush() ourselves to ensure that we don't - // leave arbitrary data in the BufferedOutputStream that hasn't been flushed to the underlying - // FileOutputStream. + // Note that here we are closing fileOut and NOT dataOut. See the note in the close() + // method to understand the logic behind this. final OutputStream out = fileOut; if (out != null) { try { diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java index 03e6581131..7b7d2cacbe 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java @@ -20,9 +20,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.FileFilter; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -417,6 +422,162 @@ public class TestMinimalLockingWriteAheadLog { } + @Test + public void testShutdownWhileBlacklisted() throws IOException { + final Path path = Paths.get("target/minimal-locking-repo-shutdown-blacklisted"); + deleteRecursively(path.toFile()); + Files.createDirectories(path); + + final SerDe failOnThirdWriteSerde = new SerDe() { + private int writes = 0; + + @Override + public void serializeEdit(SimpleRecord previousRecordState, SimpleRecord newRecordState, DataOutputStream out) throws IOException { + serializeRecord(newRecordState, out); + } + + @Override + public void serializeRecord(SimpleRecord record, DataOutputStream out) throws IOException { + int size = (int) record.getSize(); + out.writeLong(record.getSize()); + + for (int i = 0; i < size; i++) { + out.write('A'); + } + + if (++writes == 3) { + throw new IOException("Intentional Exception for Unit Testing"); + } + + out.writeLong(record.getId()); + } + + @Override + public SimpleRecord deserializeEdit(DataInputStream in, Map currentRecordStates, int version) throws IOException { + return deserializeRecord(in, version); + } + + @Override + public SimpleRecord deserializeRecord(DataInputStream in, int version) throws IOException { + long size = in.readLong(); + + for (int i = 0; i < (int) size; i++) { + in.read(); + } + + long id = in.readLong(); + return new SimpleRecord(id, size); + } + + @Override + public Object getRecordIdentifier(SimpleRecord record) { + return record.getId(); + } + + @Override + public UpdateType getUpdateType(SimpleRecord record) { + return UpdateType.CREATE; + } + + @Override + public String getLocation(SimpleRecord record) { + return null; + } + + @Override + public int getVersion() { + return 0; + } + }; + + final WriteAheadRepository writeRepo = new MinimalLockingWriteAheadLog<>(path, 1, failOnThirdWriteSerde, null); + final Collection initialRecs = writeRepo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + + writeRepo.update(Collections.singleton(new SimpleRecord(1L, 1L)), false); + writeRepo.update(Collections.singleton(new SimpleRecord(2L, 2L)), false); + try { + // Use a size of 8194 because the BufferedOutputStream has a buffer size of 8192 and we want + // to exceed this for testing purposes. + writeRepo.update(Collections.singleton(new SimpleRecord(3L, 8194L)), false); + Assert.fail("Expected IOException but did not get it"); + } catch (final IOException ioe) { + // expected behavior + } + + final Path partitionDir = path.resolve("partition-0"); + final File journalFile = partitionDir.toFile().listFiles()[0]; + final long journalFileSize = journalFile.length(); + verifyBlacklistedJournalContents(journalFile, failOnThirdWriteSerde); + + writeRepo.shutdown(); + + // Ensure that calling shutdown() didn't write anything to the journal file + final long newJournalSize = journalFile.length(); + assertEquals("Calling Shutdown wrote " + (newJournalSize - journalFileSize) + " bytes to the journal file", newJournalSize, journalFile.length()); + } + + private void verifyBlacklistedJournalContents(final File journalFile, final SerDe serde) throws IOException { + try (final FileInputStream fis = new FileInputStream(journalFile); + final InputStream bis = new BufferedInputStream(fis); + final DataInputStream in = new DataInputStream(bis)) { + + // Verify header info. + final String waliClassName = in.readUTF(); + assertEquals(MinimalLockingWriteAheadLog.class.getName(), waliClassName); + + final int waliVersion = in.readInt(); + assertTrue(waliVersion > 0); + + final String serdeClassName = in.readUTF(); + assertEquals(serde.getClass().getName(), serdeClassName); + + final int serdeVersion = in.readInt(); + assertEquals(serde.getVersion(), serdeVersion); + + for (int i = 0; i < 2; i++) { + long transactionId = in.readLong(); + assertEquals(i, transactionId); + + // read what serde wrote + long size = in.readLong(); + + assertEquals((i + 1), size); + + for (int j = 0; j < (int) size; j++) { + final int c = in.read(); + assertEquals('A', c); + } + + long id = in.readLong(); + assertEquals((i + 1), id); + + int transactionIndicator = in.read(); + assertEquals(2, transactionIndicator); + } + + long transactionId = in.readLong(); + assertEquals(2L, transactionId); + + long thirdSize = in.readLong(); + assertEquals(8194, thirdSize); + + // should be 8176 A's because we threw an Exception after writing 8194 of them, + // but the BufferedOutputStream's buffer already had 8 bytes on it for the + // transaction id and the size. + for (int i = 0; i < 8176; i++) { + final int c = in.read(); + assertEquals("i = " + i, 'A', c); + } + + // Stream should now be out of data, because we threw an Exception! + final int nextByte = in.read(); + assertEquals(-1, nextByte); + } + } + + @Test public void testDecreaseNumberOfPartitions() throws IOException { @@ -544,4 +705,21 @@ public class TestMinimalLockingWriteAheadLog { return size; } + static class SimpleRecord { + private long id; + private long size; + + public SimpleRecord(final long id, final long size) { + this.id = id; + this.size = size; + } + + public long getId() { + return id; + } + + public long getSize() { + return size; + } + } }