NIFI-6410: Addressed race condition in LengthDelimitedJournal in which a Thread could throw an Exception, then another Thread could update the Journal before the first thread closes it. Added unit test to replicate.

This closes #3561
This commit is contained in:
Mark Payne 2019-07-01 15:02:41 -04:00 committed by Matt Gilman
parent 41663929a4
commit 40dcd1577b
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
2 changed files with 186 additions and 10 deletions

View File

@ -301,17 +301,27 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
final long transactionId;
synchronized (this) {
transactionId = currentTransactionId++;
transactionCount++;
checkState();
transactionPreamble.clear();
transactionPreamble.putLong(transactionId);
transactionPreamble.putInt(baos.size());
try {
transactionId = currentTransactionId++;
transactionCount++;
out.write(TRANSACTION_FOLLOWS);
out.write(transactionPreamble.array());
baos.writeTo(out);
out.flush();
transactionPreamble.clear();
transactionPreamble.putLong(transactionId);
transactionPreamble.putInt(baos.size());
out.write(TRANSACTION_FOLLOWS);
out.write(transactionPreamble.array());
baos.writeTo(out);
out.flush();
} catch (final Throwable t) {
// While the outter Throwable that wraps this "catch" will call Poison, it is imperative that we call poison()
// before the synchronized block is excited. Otherwise, another thread could potentially corrupt the journal before
// the poison method closes the file.
poison(t);
throw t;
}
}
logger.debug("Wrote Transaction {} to journal {} with length {} and {} records", transactionId, journalFile, baos.size(), records.size());
@ -343,7 +353,7 @@ public class LengthDelimitedJournal<T> implements WriteAheadJournal<T> {
}
}
private void poison(final Throwable t) {
protected void poison(final Throwable t) {
this.poisoned = true;
try {

View File

@ -26,6 +26,8 @@ import org.wali.SerDeFactory;
import org.wali.SingletonSerDeFactory;
import org.wali.UpdateType;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@ -42,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
@ -419,4 +422,167 @@ public class TestLengthDelimitedJournal {
assertEquals(secondRecord, retrieved2);
}
}
/**
* This test is rather complicated and creates a lot of odd objects with Thread.sleep, etc., and it may not be at-all clear what is happening. The intent of this
* test is to try to cause a race condition to occur that would cause the journal to become corrupt. Consider the following scenario:
*
* 1. Thread 1 attempts to update the Journal. In the #update method, it checks the state of the Journal, which is healthy.
* 2. Thread 2 attempts to update the Journal. In the #update method, it checks the state of the Journal, which is healthy.
* 3. Thread 1 now throws an Exception when attempt to write to disk (in this case, an IOException but could also be an OutOfMemoryError, etc.) The Exception is caught by the Journal,
* and the #poisoni method is called. Before the #poison method is able to close the underlying Output Stream, Thread 2 is able to write to the Output Stream (which is no longer guarded
* by Thread 1 because of the Exception that was just thrown).
* 4. Thread 2 writes to the Journal, after Thread 1 had written only a partial update. The repository has now become corrupt.
* 5. Thread 1 closes the file handle, but does so too late. Corruption has already occurred!
*
* In order to replicate the above series of steps in a unit test, we need to introduce some oddities.
*
* - The #poison method, when called, needs to wait a bit before actually closing the underlying FileOutputStream, so that Thread 2 has a chance to run after Thread 1 stop guarding the Output
* Stream and before Thread 1 closes the Output Stream. (This is handled in the test by subclassing the Journal and overriding the #poison method to pause).
* - We need to ensure that Thread 1 and Thread 2 are both able to pass the #checkState check in #update before the corruption occurs. (This is handled in the test by the 'pausingBados' object,
* which will enter the #update method, then pause before returning the ByteArrayOutputStream, which essentially yields to Thread 1, the 'corrupting thread').
* - After both threads have passed the #checkState check, we need Thread 1 to write a partial update, then throw an Exception (This is handled in the test by the 'corruptingBados' object).
* - After the Exception is thrown, we need Thread 2 to update the contents of the repository before the file is closed. (This is handled in the test by subclassing the Journal and calling
* Thread.sleep in the #poison method).
*
*/
@Test
public void testFailureDuringWriteCannotCauseCorruption() throws IOException, InterruptedException {
// Create a ByteArrayDataOutputStream such that when attempting to write the data to another OutputStream via its ByteArrayOutputStream,
// the BADOS will copy 5 bytes, then throw an IOException. This should result in the journal being poisoned such that it can no longer
// be written to.
final ByteArrayDataOutputStream corruptingBados = new ByteArrayDataOutputStream(4096) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
@Override
public synchronized void writeTo(final OutputStream out) throws IOException {
out.write(buf, 0, 5);
throw new IOException("Intentional Exception in Unit Test designed to cause corruption");
}
@Override
public synchronized String toString() {
return "Corrupting ByteArrayOutputStream[" + super.toString() + "]";
}
};
@Override
public ByteArrayOutputStream getByteArrayOutputStream() {
return baos;
}
@Override
public DataOutputStream getDataOutputStream() {
return new DataOutputStream(baos);
}
};
// Create a ByteArrayDataOutputStream such that when attempting to write the data to another OutputStream via its ByteArrayOutputStream,
// the BADOS will sleep for 1 second before writing. This allwos other threads to trigger corruption in the repo in the meantime.
final ByteArrayDataOutputStream pausingBados = new ByteArrayDataOutputStream(4096) {
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
private int count = 0;
@Override
public ByteArrayOutputStream getByteArrayOutputStream() {
// Pause only on the second iteration.
if (count++ == 1) {
try {
Thread.sleep(1000L);
} catch (final InterruptedException ie) {
}
}
return baos;
}
@Override
public DataOutputStream getDataOutputStream() {
return new DataOutputStream(baos);
}
};
final Supplier<ByteArrayDataOutputStream> badosSupplier = new Supplier<ByteArrayDataOutputStream>() {
private int count = 0;
@Override
public ByteArrayDataOutputStream get() {
if (count++ == 0) {
return pausingBados;
}
return corruptingBados;
}
};
final ObjectPool<ByteArrayDataOutputStream> corruptingStreamPool = new BlockingQueuePool<>(2,
badosSupplier,
stream -> true,
stream -> stream.getByteArrayOutputStream().reset());
final Thread[] threads = new Thread[2];
final LengthDelimitedJournal<DummyRecord> journal = new LengthDelimitedJournal<DummyRecord>(journalFile, serdeFactory, corruptingStreamPool, 0L) {
private int count = 0;
@Override
protected void poison(final Throwable t) {
if (count++ == 0) { // it is only important that we sleep the first time. If we sleep every time, it just slows the test down.
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
}
}
super.poison(t);
}
};
final DummyRecord firstRecord, secondRecord;
try {
journal.writeHeader();
firstRecord = new DummyRecord("1", UpdateType.CREATE);
secondRecord = new DummyRecord("2", UpdateType.CREATE);
final Thread t1 = new Thread(() -> {
try {
journal.update(Collections.singleton(firstRecord), key -> null);
} catch (final IOException ioe) {
}
});
final Thread t2 = new Thread(() -> {
try {
journal.update(Collections.singleton(secondRecord), key -> firstRecord);
} catch (final IOException ioe) {
}
});
threads[0] = t1;
threads[1] = t2;
t1.start();
t2.start();
for (Thread thread : threads) {
thread.join();
}
} finally {
journal.close();
}
// Now, attempt to read from the Journal to ensure that it is not corrupt.
try (final LengthDelimitedJournal<DummyRecord> recoveryJournal = new LengthDelimitedJournal<>(journalFile, serdeFactory, corruptingStreamPool, 0L)) {
final Map<Object, DummyRecord> recordMap = new HashMap<>();
final Set<String> swapLocations = new HashSet<>();
recoveryJournal.recoverRecords(recordMap, swapLocations);
assertEquals(0, recordMap.size());
}
}
}