NIFI-3273 This closes #1611. Handle the case of trailing NUL bytes in MinimalLockingWriteAheadLog

This commit is contained in:
Mark Payne 2017-04-17 13:35:10 -04:00 committed by joewitt
parent 0207f21ce4
commit 0f2ac39f69
3 changed files with 219 additions and 13 deletions

View File

@ -663,8 +663,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
* @param <S> type of record held in the partitions
*/
private static class Partition<S> {
public static final String JOURNAL_EXTENSION = ".journal";
private static final int NUL_BYTE = 0;
private static final Pattern JOURNAL_FILENAME_PATTERN = Pattern.compile("\\d+\\.journal");
private final SerDeFactory<S> serdeFactory;
@ -1013,6 +1013,17 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
transactionId = recoveryIn.readLong();
} catch (final EOFException e) {
continue;
} catch (final Exception e) {
// If the stream consists solely of NUL bytes, then we want to treat it
// the same as an EOF because we see this happen when we suddenly lose power
// while writing to a file.
if (remainingBytesAllNul(recoveryIn)) {
logger.warn("Failed to recover data from Write-Ahead Log Partition because encountered trailing NUL bytes. "
+ "This will sometimes happen after a sudden power loss. The rest of this journal file will be skipped for recovery purposes.");
continue;
} else {
throw e;
}
}
this.maxTransactionId.set(transactionId);
@ -1020,6 +1031,27 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
}
}
/**
* In the case of a sudden power loss, it is common - at least in a Linux journaling File System -
* that the partition file that is being written to will have many trailing "NUL bytes" (0's).
* If this happens, then on restart we want to treat this as an incomplete transaction, so we detect
* this case explicitly.
*
* @param in the input stream to scan
* @return <code>true</code> if the InputStream contains no data or contains only NUL bytes
* @throws IOException if unable to read from the given InputStream
*/
private boolean remainingBytesAllNul(final InputStream in) throws IOException {
int nextByte;
while ((nextByte = in.read()) != -1) {
if (nextByte != NUL_BYTE) {
return false;
}
}
return true;
}
private boolean hasMoreData(final InputStream in) throws IOException {
in.mark(1);
final int nextByte = in.read();
@ -1059,7 +1091,40 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
int transactionFlag;
do {
final S record = serde.deserializeEdit(recoveryIn, currentRecordMap, recoveryVersion);
final S record;
try {
record = serde.deserializeEdit(recoveryIn, currentRecordMap, recoveryVersion);
} catch (final EOFException eof) {
throw eof;
} catch (final Exception e) {
// If the stream consists solely of NUL bytes, then we want to treat it
// the same as an EOF because we see this happen when we suddenly lose power
// while writing to a file. We also have logic already in the caller of this
// method to properly handle EOFException's, so we will simply throw an EOFException
// ourselves. However, if that is not the case, then something else has gone wrong.
// In such a case, there is not much that we can do. If we simply skip over the transaction,
// then the transaction may be indicating that a new attribute was added or changed. Or the
// content of the FlowFile changed. A subsequent transaction for the same FlowFile may then
// update the connection that is holding the FlowFile. In this case, if we simply skip over
// the transaction, we end up with a FlowFile in a queue that has the wrong attributes or
// content, and that can result in some very bad behavior - even security vulnerabilities if
// a Route processor, for instance, routes incorrectly due to a missing attribute or content
// is pointing to a previous claim where sensitive values have not been removed, etc. So
// instead of attempting to skip the transaction and move on, we instead just throw the Exception
// indicating that the write-ahead log is corrupt and allow the user to handle it as he/she sees
// fit (likely this will result in deleting the repo, but it's possible that it could be repaired
// manually or through some sort of script).
if (remainingBytesAllNul(recoveryIn)) {
final EOFException eof = new EOFException("Failed to recover data from Write-Ahead Log Partition because encountered trailing NUL bytes. "
+ "This will sometimes happen after a sudden power loss. The rest of this journal file will be skipped for recovery purposes.");
eof.addSuppressed(e);
throw eof;
} else {
throw e;
}
}
if (logger.isDebugEnabled()) {
logger.debug("{} Recovering Transaction {}: {}", new Object[] { this, maxTransactionId.get(), record });
}

View File

@ -18,13 +18,11 @@ package org.wali;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.Map;
public class DummyRecordSerde implements SerDe<DummyRecord> {
public static final int NUM_UPDATE_TYPES = UpdateType.values().length;
private int throwIOEAfterNserializeEdits = -1;
private int throwOOMEAfterNserializeEdits = -1;
private int serializeEditCount = 0;
@ -38,7 +36,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
throw new OutOfMemoryError("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw OOME");
}
out.write(record.getUpdateType().ordinal());
out.writeUTF(record.getUpdateType().name());
out.writeUTF(record.getId());
if (record.getUpdateType() != UpdateType.DELETE) {
@ -58,14 +56,8 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
@Override
public DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
final int index = in.read();
if (index < 0) {
throw new EOFException();
}
if (index >= NUM_UPDATE_TYPES) {
throw new IOException("Corrupt stream; got UpdateType value of " + index + " but there are only " + NUM_UPDATE_TYPES + " valid values");
}
final UpdateType updateType = UpdateType.values()[index];
final String updateTypeName = in.readUTF();
final UpdateType updateType = UpdateType.valueOf(updateTypeName);
final String id = in.readUTF();
final DummyRecord record = new DummyRecord(id, updateType);

View File

@ -27,12 +27,15 @@ import java.io.EOFException;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -457,6 +460,152 @@ public class TestMinimalLockingWriteAheadLog {
assertTrue(record3);
}
@Test
public void testRecoverFileThatHasTrailingNULBytesAndTruncation() throws IOException {
final int numPartitions = 5;
final Path path = Paths.get("target/testRecoverFileThatHasTrailingNULBytes");
deleteRecursively(path.toFile());
Files.createDirectories(path);
final DummyRecordSerde serde = new DummyRecordSerde();
final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
final Collection<DummyRecord> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());
final List<DummyRecord> firstTransaction = new ArrayList<>();
firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
final List<DummyRecord> secondTransaction = new ArrayList<>();
secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123"));
secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123"));
secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123"));
final List<DummyRecord> thirdTransaction = new ArrayList<>();
thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
repo.update(firstTransaction, true);
repo.update(secondTransaction, true);
repo.update(thirdTransaction, true);
repo.shutdown();
final File partition3Dir = path.resolve("partition-2").toFile();
final File journalFile = partition3Dir.listFiles()[0];
final byte[] contents = Files.readAllBytes(journalFile.toPath());
// Truncate the contents of the journal file by 8 bytes. Then replace with 28 trailing NUL bytes,
// as this is what we often see when we have a sudden power loss.
final byte[] truncated = Arrays.copyOfRange(contents, 0, contents.length - 8);
final byte[] withNuls = new byte[truncated.length + 28];
System.arraycopy(truncated, 0, withNuls, 0, truncated.length);
try (final OutputStream fos = new FileOutputStream(journalFile)) {
fos.write(withNuls);
}
final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords();
assertFalse(recoveredRecords.isEmpty());
assertEquals(3, recoveredRecords.size());
boolean record1 = false, record2 = false, record3 = false;
for (final DummyRecord record : recoveredRecords) {
switch (record.getId()) {
case "1":
record1 = true;
assertEquals("123", record.getProperty("abc"));
break;
case "2":
record2 = true;
assertEquals("123", record.getProperty("cba"));
break;
case "3":
record3 = true;
assertEquals("123", record.getProperty("aaa"));
break;
}
}
assertTrue(record1);
assertTrue(record2);
assertTrue(record3);
}
@Test
public void testRecoverFileThatHasTrailingNULBytesNoTruncation() throws IOException {
final int numPartitions = 5;
final Path path = Paths.get("target/testRecoverFileThatHasTrailingNULBytes");
deleteRecursively(path.toFile());
Files.createDirectories(path);
final DummyRecordSerde serde = new DummyRecordSerde();
final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
final Collection<DummyRecord> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());
final List<DummyRecord> firstTransaction = new ArrayList<>();
firstTransaction.add(new DummyRecord("1", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("2", UpdateType.CREATE));
firstTransaction.add(new DummyRecord("3", UpdateType.CREATE));
final List<DummyRecord> secondTransaction = new ArrayList<>();
secondTransaction.add(new DummyRecord("1", UpdateType.UPDATE).setProperty("abc", "123"));
secondTransaction.add(new DummyRecord("2", UpdateType.UPDATE).setProperty("cba", "123"));
secondTransaction.add(new DummyRecord("3", UpdateType.UPDATE).setProperty("aaa", "123"));
final List<DummyRecord> thirdTransaction = new ArrayList<>();
thirdTransaction.add(new DummyRecord("1", UpdateType.DELETE));
thirdTransaction.add(new DummyRecord("2", UpdateType.DELETE));
repo.update(firstTransaction, true);
repo.update(secondTransaction, true);
repo.update(thirdTransaction, true);
repo.shutdown();
final File partition3Dir = path.resolve("partition-2").toFile();
final File journalFile = partition3Dir.listFiles()[0];
// Truncate the contents of the journal file by 8 bytes. Then replace with 28 trailing NUL bytes,
// as this is what we often see when we have a sudden power loss.
final byte[] withNuls = new byte[28];
try (final OutputStream fos = new FileOutputStream(journalFile, true)) {
fos.write(withNuls);
}
final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
final Collection<DummyRecord> recoveredRecords = recoverRepo.recoverRecords();
assertFalse(recoveredRecords.isEmpty());
assertEquals(1, recoveredRecords.size());
boolean record1 = false, record2 = false, record3 = false;
for (final DummyRecord record : recoveredRecords) {
switch (record.getId()) {
case "1":
record1 = record.getUpdateType() != UpdateType.DELETE;
assertEquals("123", record.getProperty("abc"));
break;
case "2":
record2 = record.getUpdateType() != UpdateType.DELETE;
assertEquals("123", record.getProperty("cba"));
break;
case "3":
record3 = true;
assertEquals("123", record.getProperty("aaa"));
break;
}
}
assertFalse(record1);
assertFalse(record2);
assertTrue(record3);
}
@Test
public void testCannotModifyLogAfterAllAreBlackListed() throws IOException {
final int numPartitions = 5;