diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java index 894907323d..0914a791df 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java @@ -663,8 +663,8 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor * @param type of record held in the partitions */ private static class Partition { - public static final String JOURNAL_EXTENSION = ".journal"; + private static final int NUL_BYTE = 0; private static final Pattern JOURNAL_FILENAME_PATTERN = Pattern.compile("\\d+\\.journal"); private final SerDeFactory serdeFactory; @@ -1013,6 +1013,17 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor transactionId = recoveryIn.readLong(); } catch (final EOFException e) { continue; + } catch (final Exception e) { + // If the stream consists solely of NUL bytes, then we want to treat it + // the same as an EOF because we see this happen when we suddenly lose power + // while writing to a file. + if (remainingBytesAllNul(recoveryIn)) { + logger.warn("Failed to recover data from Write-Ahead Log Partition because encountered trailing NUL bytes. " + + "This will sometimes happen after a sudden power loss. The rest of this journal file will be skipped for recovery purposes."); + continue; + } else { + throw e; + } } this.maxTransactionId.set(transactionId); @@ -1020,6 +1031,27 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor } } + /** + * In the case of a sudden power loss, it is common - at least in a Linux journaling File System - + * that the partition file that is being written to will have many trailing "NUL bytes" (0's). + * If this happens, then on restart we want to treat this as an incomplete transaction, so we detect + * this case explicitly. + * + * @param in the input stream to scan + * @return true if the InputStream contains no data or contains only NUL bytes + * @throws IOException if unable to read from the given InputStream + */ + private boolean remainingBytesAllNul(final InputStream in) throws IOException { + int nextByte; + while ((nextByte = in.read()) != -1) { + if (nextByte != NUL_BYTE) { + return false; + } + } + + return true; + } + private boolean hasMoreData(final InputStream in) throws IOException { in.mark(1); final int nextByte = in.read(); @@ -1059,7 +1091,40 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor int transactionFlag; do { - final S record = serde.deserializeEdit(recoveryIn, currentRecordMap, recoveryVersion); + final S record; + try { + record = serde.deserializeEdit(recoveryIn, currentRecordMap, recoveryVersion); + } catch (final EOFException eof) { + throw eof; + } catch (final Exception e) { + // If the stream consists solely of NUL bytes, then we want to treat it + // the same as an EOF because we see this happen when we suddenly lose power + // while writing to a file. We also have logic already in the caller of this + // method to properly handle EOFException's, so we will simply throw an EOFException + // ourselves. However, if that is not the case, then something else has gone wrong. + // In such a case, there is not much that we can do. If we simply skip over the transaction, + // then the transaction may be indicating that a new attribute was added or changed. Or the + // content of the FlowFile changed. A subsequent transaction for the same FlowFile may then + // update the connection that is holding the FlowFile. In this case, if we simply skip over + // the transaction, we end up with a FlowFile in a queue that has the wrong attributes or + // content, and that can result in some very bad behavior - even security vulnerabilities if + // a Route processor, for instance, routes incorrectly due to a missing attribute or content + // is pointing to a previous claim where sensitive values have not been removed, etc. So + // instead of attempting to skip the transaction and move on, we instead just throw the Exception + // indicating that the write-ahead log is corrupt and allow the user to handle it as he/she sees + // fit (likely this will result in deleting the repo, but it's possible that it could be repaired + // manually or through some sort of script). + if (remainingBytesAllNul(recoveryIn)) { + final EOFException eof = new EOFException("Failed to recover data from Write-Ahead Log Partition because encountered trailing NUL bytes. " + + "This will sometimes happen after a sudden power loss. The rest of this journal file will be skipped for recovery purposes."); + eof.addSuppressed(e); + throw eof; + } else { + throw e; + } + } + + if (logger.isDebugEnabled()) { logger.debug("{} Recovering Transaction {}: {}", new Object[] { this, maxTransactionId.get(), record }); } diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java index 3a4e79f96d..e9f3b0194f 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java @@ -18,13 +18,11 @@ package org.wali; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.EOFException; import java.io.IOException; import java.util.Map; public class DummyRecordSerde implements SerDe { - public static final int NUM_UPDATE_TYPES = UpdateType.values().length; private int throwIOEAfterNserializeEdits = -1; private int throwOOMEAfterNserializeEdits = -1; private int serializeEditCount = 0; @@ -38,7 +36,7 @@ public class DummyRecordSerde implements SerDe { throw new OutOfMemoryError("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw OOME"); } - out.write(record.getUpdateType().ordinal()); + out.writeUTF(record.getUpdateType().name()); out.writeUTF(record.getId()); if (record.getUpdateType() != UpdateType.DELETE) { @@ -58,14 +56,8 @@ public class DummyRecordSerde implements SerDe { @Override public DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { - final int index = in.read(); - if (index < 0) { - throw new EOFException(); - } - if (index >= NUM_UPDATE_TYPES) { - throw new IOException("Corrupt stream; got UpdateType value of " + index + " but there are only " + NUM_UPDATE_TYPES + " valid values"); - } - final UpdateType updateType = UpdateType.values()[index]; + final String updateTypeName = in.readUTF(); + final UpdateType updateType = UpdateType.valueOf(updateTypeName); final String id = in.readUTF(); final DummyRecord record = new DummyRecord(id, updateType); diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java index cbca9684fe..ef33f57fe7 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java @@ -27,12 +27,15 @@ import java.io.EOFException; import java.io.File; import java.io.FileFilter; import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -457,6 +460,152 @@ public class TestMinimalLockingWriteAheadLog { assertTrue(record3); } + + @Test + public void testRecoverFileThatHasTrailingNULBytesAndTruncation() throws IOException { + final int numPartitions = 5; + final Path path = Paths.get("target/testRecoverFileThatHasTrailingNULBytes"); + deleteRecursively(path.toFile()); + Files.createDirectories(path); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final WriteAheadRepository repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); + final Collection initialRecs = repo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + final List firstTransaction = new ArrayList<>(); + firstTransaction.add(new DummyRecord("1", UpdateType.CREATE)); + firstTransaction.add(new DummyRecord("2", UpdateType.CREATE)); + firstTransaction.add(new DummyRecord("3", UpdateType.CREATE)); + + final List secondTransaction = new ArrayList<>(); + secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123")); + secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123")); + secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123")); + + final List thirdTransaction = new ArrayList<>(); + thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE)); + thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE)); + + repo.update(firstTransaction, true); + repo.update(secondTransaction, true); + repo.update(thirdTransaction, true); + + repo.shutdown(); + + final File partition3Dir = path.resolve("partition-2").toFile(); + final File journalFile = partition3Dir.listFiles()[0]; + final byte[] contents = Files.readAllBytes(journalFile.toPath()); + + // Truncate the contents of the journal file by 8 bytes. Then replace with 28 trailing NUL bytes, + // as this is what we often see when we have a sudden power loss. + final byte[] truncated = Arrays.copyOfRange(contents, 0, contents.length - 8); + final byte[] withNuls = new byte[truncated.length + 28]; + System.arraycopy(truncated, 0, withNuls, 0, truncated.length); + + try (final OutputStream fos = new FileOutputStream(journalFile)) { + fos.write(withNuls); + } + + final WriteAheadRepository recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); + final Collection recoveredRecords = recoverRepo.recoverRecords(); + assertFalse(recoveredRecords.isEmpty()); + assertEquals(3, recoveredRecords.size()); + + boolean record1 = false, record2 = false, record3 = false; + for (final DummyRecord record : recoveredRecords) { + switch (record.getId()) { + case "1": + record1 = true; + assertEquals("123", record.getProperty("abc")); + break; + case "2": + record2 = true; + assertEquals("123", record.getProperty("cba")); + break; + case "3": + record3 = true; + assertEquals("123", record.getProperty("aaa")); + break; + } + } + + assertTrue(record1); + assertTrue(record2); + assertTrue(record3); + } + + @Test + public void testRecoverFileThatHasTrailingNULBytesNoTruncation() throws IOException { + final int numPartitions = 5; + final Path path = Paths.get("target/testRecoverFileThatHasTrailingNULBytes"); + deleteRecursively(path.toFile()); + Files.createDirectories(path); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final WriteAheadRepository repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); + final Collection initialRecs = repo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + final List firstTransaction = new ArrayList<>(); + firstTransaction.add(new DummyRecord("1", UpdateType.CREATE)); + firstTransaction.add(new DummyRecord("2", UpdateType.CREATE)); + firstTransaction.add(new DummyRecord("3", UpdateType.CREATE)); + + final List secondTransaction = new ArrayList<>(); + secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123")); + secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123")); + secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123")); + + final List thirdTransaction = new ArrayList<>(); + thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE)); + thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE)); + + repo.update(firstTransaction, true); + repo.update(secondTransaction, true); + repo.update(thirdTransaction, true); + + repo.shutdown(); + + final File partition3Dir = path.resolve("partition-2").toFile(); + final File journalFile = partition3Dir.listFiles()[0]; + + // Truncate the contents of the journal file by 8 bytes. Then replace with 28 trailing NUL bytes, + // as this is what we often see when we have a sudden power loss. + final byte[] withNuls = new byte[28]; + + try (final OutputStream fos = new FileOutputStream(journalFile, true)) { + fos.write(withNuls); + } + + final WriteAheadRepository recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); + final Collection recoveredRecords = recoverRepo.recoverRecords(); + assertFalse(recoveredRecords.isEmpty()); + assertEquals(1, recoveredRecords.size()); + + boolean record1 = false, record2 = false, record3 = false; + for (final DummyRecord record : recoveredRecords) { + switch (record.getId()) { + case "1": + record1 = record.getUpdateType() != UpdateType.DELETE; + assertEquals("123", record.getProperty("abc")); + break; + case "2": + record2 = record.getUpdateType() != UpdateType.DELETE; + assertEquals("123", record.getProperty("cba")); + break; + case "3": + record3 = true; + assertEquals("123", record.getProperty("aaa")); + break; + } + } + + assertFalse(record1); + assertFalse(record2); + assertTrue(record3); + } + @Test public void testCannotModifyLogAfterAllAreBlackListed() throws IOException { final int numPartitions = 5;