NIFI-1574: Ensure that we never flush a BufferedOutputStream's buffer on close of the write-ahead log

This commit is contained in:
Mark Payne 2016-02-28 10:08:28 -05:00
parent 1149bc61cb
commit 62333c9e0a
2 changed files with 208 additions and 28 deletions

View File

@ -689,38 +689,6 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
}
public void close() {
final DataOutputStream out = dataOut;
if (out != null) {
try {
out.close();
} catch (final Exception e) {
}
}
this.closed = true;
this.dataOut = null;
}
public void blackList() {
lock.lock();
try {
blackListed = true;
} finally {
lock.unlock();
}
logger.debug("Blacklisted {}", this);
}
/**
* Closes resources pointing to the current journal and begins writing
* to a new one
*
* @throws IOException if failure to rollover
*/
public void rollover() throws IOException {
lock.lock();
try {
// Note that here we are closing fileOut and NOT dataOut.
// This is very much intentional, not an oversight. This is done because of
// the way that the OutputStreams are structured. dataOut wraps a BufferedOutputStream,
@ -748,6 +716,40 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
// leave arbitrary data in the BufferedOutputStream that hasn't been flushed to the underlying
// FileOutputStream.
final OutputStream out = fileOut;
if (out != null) {
try {
out.close();
} catch (final Exception e) {
}
}
this.closed = true;
this.dataOut = null;
this.fileOut = null;
}
public void blackList() {
lock.lock();
try {
blackListed = true;
} finally {
lock.unlock();
}
logger.debug("Blacklisted {}", this);
}
/**
* Closes resources pointing to the current journal and begins writing
* to a new one
*
* @throws IOException if failure to rollover
*/
public void rollover() throws IOException {
lock.lock();
try {
// Note that here we are closing fileOut and NOT dataOut. See the note in the close()
// method to understand the logic behind this.
final OutputStream out = fileOut;
if (out != null) {
try {
out.close();

View File

@ -20,9 +20,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@ -417,6 +422,162 @@ public class TestMinimalLockingWriteAheadLog {
}
@Test
public void testShutdownWhileBlacklisted() throws IOException {
final Path path = Paths.get("target/minimal-locking-repo-shutdown-blacklisted");
deleteRecursively(path.toFile());
Files.createDirectories(path);
final SerDe<SimpleRecord> failOnThirdWriteSerde = new SerDe<SimpleRecord>() {
private int writes = 0;
@Override
public void serializeEdit(SimpleRecord previousRecordState, SimpleRecord newRecordState, DataOutputStream out) throws IOException {
serializeRecord(newRecordState, out);
}
@Override
public void serializeRecord(SimpleRecord record, DataOutputStream out) throws IOException {
int size = (int) record.getSize();
out.writeLong(record.getSize());
for (int i = 0; i < size; i++) {
out.write('A');
}
if (++writes == 3) {
throw new IOException("Intentional Exception for Unit Testing");
}
out.writeLong(record.getId());
}
@Override
public SimpleRecord deserializeEdit(DataInputStream in, Map<Object, SimpleRecord> currentRecordStates, int version) throws IOException {
return deserializeRecord(in, version);
}
@Override
public SimpleRecord deserializeRecord(DataInputStream in, int version) throws IOException {
long size = in.readLong();
for (int i = 0; i < (int) size; i++) {
in.read();
}
long id = in.readLong();
return new SimpleRecord(id, size);
}
@Override
public Object getRecordIdentifier(SimpleRecord record) {
return record.getId();
}
@Override
public UpdateType getUpdateType(SimpleRecord record) {
return UpdateType.CREATE;
}
@Override
public String getLocation(SimpleRecord record) {
return null;
}
@Override
public int getVersion() {
return 0;
}
};
final WriteAheadRepository<SimpleRecord> writeRepo = new MinimalLockingWriteAheadLog<>(path, 1, failOnThirdWriteSerde, null);
final Collection<SimpleRecord> initialRecs = writeRepo.recoverRecords();
assertTrue(initialRecs.isEmpty());
writeRepo.update(Collections.singleton(new SimpleRecord(1L, 1L)), false);
writeRepo.update(Collections.singleton(new SimpleRecord(2L, 2L)), false);
try {
// Use a size of 8194 because the BufferedOutputStream has a buffer size of 8192 and we want
// to exceed this for testing purposes.
writeRepo.update(Collections.singleton(new SimpleRecord(3L, 8194L)), false);
Assert.fail("Expected IOException but did not get it");
} catch (final IOException ioe) {
// expected behavior
}
final Path partitionDir = path.resolve("partition-0");
final File journalFile = partitionDir.toFile().listFiles()[0];
final long journalFileSize = journalFile.length();
verifyBlacklistedJournalContents(journalFile, failOnThirdWriteSerde);
writeRepo.shutdown();
// Ensure that calling shutdown() didn't write anything to the journal file
final long newJournalSize = journalFile.length();
assertEquals("Calling Shutdown wrote " + (newJournalSize - journalFileSize) + " bytes to the journal file", newJournalSize, journalFile.length());
}
private void verifyBlacklistedJournalContents(final File journalFile, final SerDe<?> serde) throws IOException {
try (final FileInputStream fis = new FileInputStream(journalFile);
final InputStream bis = new BufferedInputStream(fis);
final DataInputStream in = new DataInputStream(bis)) {
// Verify header info.
final String waliClassName = in.readUTF();
assertEquals(MinimalLockingWriteAheadLog.class.getName(), waliClassName);
final int waliVersion = in.readInt();
assertTrue(waliVersion > 0);
final String serdeClassName = in.readUTF();
assertEquals(serde.getClass().getName(), serdeClassName);
final int serdeVersion = in.readInt();
assertEquals(serde.getVersion(), serdeVersion);
for (int i = 0; i < 2; i++) {
long transactionId = in.readLong();
assertEquals(i, transactionId);
// read what serde wrote
long size = in.readLong();
assertEquals((i + 1), size);
for (int j = 0; j < (int) size; j++) {
final int c = in.read();
assertEquals('A', c);
}
long id = in.readLong();
assertEquals((i + 1), id);
int transactionIndicator = in.read();
assertEquals(2, transactionIndicator);
}
long transactionId = in.readLong();
assertEquals(2L, transactionId);
long thirdSize = in.readLong();
assertEquals(8194, thirdSize);
// should be 8176 A's because we threw an Exception after writing 8194 of them,
// but the BufferedOutputStream's buffer already had 8 bytes on it for the
// transaction id and the size.
for (int i = 0; i < 8176; i++) {
final int c = in.read();
assertEquals("i = " + i, 'A', c);
}
// Stream should now be out of data, because we threw an Exception!
final int nextByte = in.read();
assertEquals(-1, nextByte);
}
}
@Test
public void testDecreaseNumberOfPartitions() throws IOException {
@ -544,4 +705,21 @@ public class TestMinimalLockingWriteAheadLog {
return size;
}
static class SimpleRecord {
private long id;
private long size;
public SimpleRecord(final long id, final long size) {
this.id = id;
this.size = size;
}
public long getId() {
return id;
}
public long getSize() {
return size;
}
}
}