NIFI-800: Ensured that all Throwable that gets thrown when updating a Partition marks the Partition as unusable until a checkpoint occurs

This commit is contained in:
Mark Payne 2015-08-03 08:43:18 -04:00
parent 496ebfb3be
commit 34e08ba775
4 changed files with 238 additions and 8 deletions

View File

@ -60,7 +60,7 @@ import java.util.regex.Pattern;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -226,10 +226,10 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
try {
partition.update(records, transactionId, unmodifiableRecordMap, forceSync);
} catch (final Exception e) {
} catch (final Throwable t) {
partition.blackList();
numberBlackListedPartitions.incrementAndGet();
throw e;
throw t;
}
if (forceSync && syncListener != null) {
@ -511,9 +511,10 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
for (final Partition<T> partition : partitions) {
try {
partition.rollover();
} catch (final IOException ioe) {
} catch (final Throwable t) {
partition.blackList();
throw ioe;
numberBlackListedPartitions.getAndIncrement();
throw t;
}
}
@ -878,7 +879,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
}
private DataInputStream createDataInputStream(final Path path) throws IOException {
return new DataInputStream(new BufferedInputStream(Files.newInputStream(path)));
return new DataInputStream(new ByteCountingInputStream(new BufferedInputStream(Files.newInputStream(path))));
}
private DataInputStream getRecoveryStream() throws IOException {
@ -892,6 +893,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
return null;
}
logger.debug("{} recovering from {}", this, nextRecoveryPath);
recoveryIn = createDataInputStream(nextRecoveryPath);
if (hasMoreData(recoveryIn)) {
final String waliImplementationClass = recoveryIn.readUTF();
@ -972,8 +974,8 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor
int transactionFlag;
do {
final S record = serde.deserializeEdit(recoveryIn, currentRecordMap, recoveryVersion);
if (logger.isTraceEnabled()) {
logger.trace("{} Recovering Transaction {}: {}", new Object[]{this, maxTransactionId.get(), record});
if (logger.isDebugEnabled()) {
logger.debug("{} Recovering Transaction {}: {}", new Object[] { this, maxTransactionId.get(), record });
}
final Object recordId = serde.getRecordIdentifier(record);

View File

@ -58,4 +58,9 @@ public class DummyRecord {
public String getProperty(final String name) {
return props.get(name);
}
@Override
public String toString() {
return "DummyRecord [id=" + id + ", props=" + props + ", updateType=" + updateType + "]";
}
}

View File

@ -26,6 +26,7 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
public static final int NUM_UPDATE_TYPES = UpdateType.values().length;
private int throwIOEAfterNserializeEdits = -1;
private int throwOOMEAfterNserializeEdits = -1;
private int serializeEditCount = 0;
@Override
@ -33,6 +34,9 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwIOEAfterNserializeEdits)) {
throw new IOException("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw IOE");
}
if (throwOOMEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwOOMEAfterNserializeEdits)) {
throw new OutOfMemoryError("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw OOME");
}
out.write(record.getUpdateType().ordinal());
out.writeUTF(record.getId());
@ -100,6 +104,10 @@ public class DummyRecordSerde implements SerDe<DummyRecord> {
this.throwIOEAfterNserializeEdits = n;
}
public void setThrowOOMEAfterNSerializeEdits(final int n) {
this.throwOOMEAfterNserializeEdits = n;
}
@Override
public String getLocation(final DummyRecord record) {
return null;

View File

@ -28,16 +28,180 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestMinimalLockingWriteAheadLog {
private static final Logger logger = LoggerFactory.getLogger(TestMinimalLockingWriteAheadLog.class);
@Test
public void testRepoDoesntContinuallyGrowOnOutOfMemoryError() throws IOException, InterruptedException {
final int numPartitions = 8;
final Path path = Paths.get("target/minimal-locking-repo");
deleteRecursively(path.toFile());
assertTrue(path.toFile().mkdirs());
final DummyRecordSerde serde = new DummyRecordSerde();
final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null);
try {
final Collection<DummyRecord> initialRecs = repo.recoverRecords();
assertTrue(initialRecs.isEmpty());
serde.setThrowOOMEAfterNSerializeEdits(100);
for (int i = 0; i < 108; i++) {
try {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
repo.update(Collections.singleton(record), false);
} catch (final OutOfMemoryError oome) {
logger.info("Received OOME on record " + i);
}
}
long expectedSize = sizeOf(path.toFile());
for (int i = 0; i < 1000; i++) {
try {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
repo.update(Collections.singleton(record), false);
Assert.fail("Expected IOE but it didn't happen");
} catch (final IOException ioe) {
// will get IOException because all Partitions have been blacklisted
}
}
long newSize = sizeOf(path.toFile());
assertEquals(expectedSize, newSize);
try {
repo.checkpoint();
Assert.fail("Expected OOME but it didn't happen");
} catch (final OutOfMemoryError oome) {
}
expectedSize = sizeOf(path.toFile());
for (int i = 0; i < 100000; i++) {
try {
final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE);
repo.update(Collections.singleton(record), false);
Assert.fail("Expected IOE but it didn't happen");
} catch (final IOException ioe) {
// will get IOException because all Partitions have been blacklisted
}
}
newSize = sizeOf(path.toFile());
assertEquals(expectedSize, newSize);
} finally {
repo.shutdown();
}
}
/**
* This test is intended to continually update the Write-ahead log using many threads, then
* stop and restore the repository to check for any corruption. There were reports of potential threading
* issues leading to repository corruption. This was an attempt to replicate. It should not be run as a
* unit test, really, but will be left, as it can be valuable to exercise the implementation
*
* @throws IOException if unable to read from/write to the write-ahead log
* @throws InterruptedException if a thread is interrupted
*/
@Test
@Ignore
public void tryToCauseThreadingIssue() throws IOException, InterruptedException {
System.setProperty("org.slf4j.simpleLogger.log.org.wali", "INFO");
final int numThreads = 12;
final long iterationsPerThread = 1000000;
final int numAttempts = 1000;
final Path path = Paths.get("D:/dummy/minimal-locking-repo");
path.toFile().mkdirs();
final AtomicReference<WriteAheadRepository<DummyRecord>> writeRepoRef = new AtomicReference<>();
final AtomicBoolean checkpointing = new AtomicBoolean(false);
final Thread bgThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
checkpointing.set(true);
final WriteAheadRepository<DummyRecord> repo = writeRepoRef.get();
if (repo != null) {
try {
repo.checkpoint();
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
}
}
checkpointing.set(false);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
}
});
bgThread.setDaemon(true);
bgThread.start();
for (int x = 0; x < numAttempts; x++) {
final DummyRecordSerde serde = new DummyRecordSerde();
final WriteAheadRepository<DummyRecord> writeRepo = new MinimalLockingWriteAheadLog<>(path, 256, serde, null);
final Collection<DummyRecord> writeRecords = writeRepo.recoverRecords();
for (final DummyRecord record : writeRecords) {
assertEquals("B", record.getProperty("A"));
}
writeRepoRef.set(writeRepo);
final Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
final Thread t = new InlineCreationInsertThread(iterationsPerThread, writeRepo);
t.start();
threads[i] = t;
}
for (final Thread t : threads) {
t.join();
}
writeRepoRef.set(null);
writeRepo.shutdown();
boolean cp = checkpointing.get();
while (cp) {
Thread.sleep(100L);
cp = checkpointing.get();
}
final WriteAheadRepository<DummyRecord> readRepo = new MinimalLockingWriteAheadLog<>(path, 256, serde, null);
// ensure that we are able to recover the records properly
final Collection<DummyRecord> readRecords = readRepo.recoverRecords();
for (final DummyRecord record : readRecords) {
assertEquals("B", record.getProperty("A"));
}
readRepo.shutdown();
}
}
@Test
public void testWrite() throws IOException, InterruptedException {
@ -285,6 +449,40 @@ public class TestMinimalLockingWriteAheadLog {
}
}
private static class InlineCreationInsertThread extends Thread {
private final long iterations;
private final WriteAheadRepository<DummyRecord> repo;
public InlineCreationInsertThread(final long numInsertions, final WriteAheadRepository<DummyRecord> repo) {
this.iterations = numInsertions;
this.repo = repo;
}
@Override
public void run() {
final List<DummyRecord> list = new ArrayList<>(1);
list.add(null);
final UpdateType[] updateTypes = new UpdateType[] { UpdateType.CREATE, UpdateType.DELETE, UpdateType.UPDATE };
final Random random = new Random();
for (long i = 0; i < iterations; i++) {
final int updateTypeIndex = random.nextInt(updateTypes.length);
final UpdateType updateType = updateTypes[updateTypeIndex];
final DummyRecord record = new DummyRecord(String.valueOf(i), updateType);
record.setProperty("A", "B");
list.set(0, record);
try {
repo.update(list, false);
} catch (final Throwable t) {
t.printStackTrace();
}
}
}
}
private void deleteRecursively(final File file) {
final File[] children = file.listFiles();
if (children != null) {
@ -295,4 +493,21 @@ public class TestMinimalLockingWriteAheadLog {
file.delete();
}
private long sizeOf(final File file) {
long size = 0L;
if (file.isDirectory()) {
final File[] children = file.listFiles();
if (children != null) {
for (final File child : children) {
size += sizeOf(child);
}
}
}
size += file.length();
return size;
}
}