NIFI-3678: Ensure that we catch EOFException when reading header information from WAL Partition files; previously, we caught EOFExceptions when reading a 'record' from the WAL but not when reading header info

NIFI-3678: If we have a transaction ID but then have no more data written to Partition file, we end up with a NPE. Added logic to avoid this and instead return null for the next record when this happens

This closes #1656.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2017-04-06 15:57:11 -04:00 committed by Bryan Bende
parent 6a75ab1740
commit 292dd1d66b
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
3 changed files with 105 additions and 15 deletions

View File

@ -973,24 +973,28 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
logger.debug("{} recovering from {}", this, nextRecoveryPath);
recoveryIn = createDataInputStream(nextRecoveryPath);
if (hasMoreData(recoveryIn)) {
final String waliImplementationClass = recoveryIn.readUTF();
if (!MinimalLockingWriteAheadLog.class.getName().equals(waliImplementationClass)) {
continue;
}
try {
final String waliImplementationClass = recoveryIn.readUTF();
if (!MinimalLockingWriteAheadLog.class.getName().equals(waliImplementationClass)) {
continue;
}
final long waliVersion = recoveryIn.readInt();
if (waliVersion > writeAheadLogVersion) {
throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using "
final long waliVersion = recoveryIn.readInt();
if (waliVersion > writeAheadLogVersion) {
throw new IOException("Cannot recovery from file " + nextRecoveryPath + " because it was written using "
+ "WALI version " + waliVersion + ", but the version used to restore it is only " + writeAheadLogVersion);
}
final String serdeEncoding = recoveryIn.readUTF();
this.recoveryVersion = recoveryIn.readInt();
serde = serdeFactory.createSerDe(serdeEncoding);
serde.readHeader(recoveryIn);
break;
} catch (final Exception e) {
logger.warn("Failed to recover data from Write-Ahead Log for {} because the header information could not be read properly. "
+ "This often is the result of the file not being fully written out before the application is restarted. This file will be ignored.", nextRecoveryPath);
}
final String serdeEncoding = recoveryIn.readUTF();
this.recoveryVersion = recoveryIn.readInt();
serde = serdeFactory.createSerDe(serdeEncoding);
serde.readHeader(recoveryIn);
break;
}
}

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
@ -41,6 +42,7 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
@ -53,6 +55,86 @@ public class TestMinimalLockingWriteAheadLog {
private static final Logger logger = LoggerFactory.getLogger(TestMinimalLockingWriteAheadLog.class);
@Test
public void testTruncatedPartitionHeader() throws IOException {
final int numPartitions = 4;
final Path path = Paths.get("target/testTruncatedPartitionHeader");
deleteRecursively(path.toFile());
assertTrue(path.toFile().mkdirs());
final AtomicInteger counter = new AtomicInteger(0);
final SerDe<Object> serde = new SerDe<Object>() {
@Override
public void readHeader(DataInputStream in) throws IOException {
if (counter.getAndIncrement() == 1) {
throw new EOFException("Intentionally thrown for unit test");
}
}
@Override
public void serializeEdit(Object previousRecordState, Object newRecordState, DataOutputStream out) throws IOException {
out.write(1);
}
@Override
public void serializeRecord(Object record, DataOutputStream out) throws IOException {
out.write(1);
}
@Override
public Object deserializeEdit(DataInputStream in, Map<Object, Object> currentRecordStates, int version) throws IOException {
final int val = in.read();
return (val == 1) ? new Object() : null;
}
@Override
public Object deserializeRecord(DataInputStream in, int version) throws IOException {
final int val = in.read();
return (val == 1) ? new Object() : null;
}
@Override
public Object getRecordIdentifier(Object record) {
return 1;
}
@Override
public UpdateType getUpdateType(Object record) {
return UpdateType.CREATE;
}
@Override
public String getLocation(Object record) {
return null;
}
@Override
public int getVersion() {
return 0;
}
};
final WriteAheadRepository<Object> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, (SyncListener) null);
try {
final Collection<Object> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());
repo.update(Collections.singletonList(new Object()), false);
repo.update(Collections.singletonList(new Object()), false);
repo.update(Collections.singletonList(new Object()), false);
} finally {
repo.shutdown();
}
final WriteAheadRepository<Object> secondRepo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, (SyncListener) null);
try {
secondRepo.recoverRecords();
} finally {
secondRepo.shutdown();
}
}
@Test
@Ignore("for local testing only")
public void testUpdatePerformance() throws IOException, InterruptedException {

View File

@ -113,6 +113,10 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
public RepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
final SchemaRecordReader reader = SchemaRecordReader.fromSchema(recoverySchema);
final Record updateRecord = reader.readRecord(in);
if (updateRecord == null) {
// null may be returned by reader.readRecord() if it encounters end-of-stream
return null;
}
// Top level is always going to be a "Repository Record Update" record because we need a 'Union' type record at the
// top level that indicates which type of record we have.