diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java index 5c8b4c8d9f..39fe65a3d1 100644 --- a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java +++ b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java @@ -60,7 +60,7 @@ import java.util.regex.Pattern; import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.BufferedOutputStream; - +import org.apache.nifi.stream.io.ByteCountingInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,10 +226,10 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor try { partition.update(records, transactionId, unmodifiableRecordMap, forceSync); - } catch (final Exception e) { + } catch (final Throwable t) { partition.blackList(); numberBlackListedPartitions.incrementAndGet(); - throw e; + throw t; } if (forceSync && syncListener != null) { @@ -511,9 +511,10 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor for (final Partition partition : partitions) { try { partition.rollover(); - } catch (final IOException ioe) { + } catch (final Throwable t) { partition.blackList(); - throw ioe; + numberBlackListedPartitions.getAndIncrement(); + throw t; } } @@ -878,7 +879,7 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor } private DataInputStream createDataInputStream(final Path path) throws IOException { - return new DataInputStream(new BufferedInputStream(Files.newInputStream(path))); + return new DataInputStream(new ByteCountingInputStream(new BufferedInputStream(Files.newInputStream(path)))); } private DataInputStream getRecoveryStream() throws IOException { @@ -892,6 +893,7 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor return null; } + logger.debug("{} recovering from {}", this, nextRecoveryPath); recoveryIn = createDataInputStream(nextRecoveryPath); if (hasMoreData(recoveryIn)) { final String waliImplementationClass = recoveryIn.readUTF(); @@ -972,8 +974,8 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor int transactionFlag; do { final S record = serde.deserializeEdit(recoveryIn, currentRecordMap, recoveryVersion); - if (logger.isTraceEnabled()) { - logger.trace("{} Recovering Transaction {}: {}", new Object[]{this, maxTransactionId.get(), record}); + if (logger.isDebugEnabled()) { + logger.debug("{} Recovering Transaction {}: {}", new Object[] { this, maxTransactionId.get(), record }); } final Object recordId = serde.getRecordIdentifier(record); diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java index e0f7f969aa..bf15ba7206 100644 --- a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java +++ b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecord.java @@ -58,4 +58,9 @@ public class DummyRecord { public String getProperty(final String name) { return props.get(name); } + + @Override + public String toString() { + return "DummyRecord [id=" + id + ", props=" + props + ", updateType=" + updateType + "]"; + } } diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java index 8cc7860610..3a4e79f96d 100644 --- a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java +++ b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java @@ -26,6 +26,7 @@ public class DummyRecordSerde implements SerDe { public static final int NUM_UPDATE_TYPES = UpdateType.values().length; private int throwIOEAfterNserializeEdits = -1; + private int throwOOMEAfterNserializeEdits = -1; private int serializeEditCount = 0; @Override @@ -33,6 +34,9 @@ public class DummyRecordSerde implements SerDe { if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwIOEAfterNserializeEdits)) { throw new IOException("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw IOE"); } + if (throwOOMEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwOOMEAfterNserializeEdits)) { + throw new OutOfMemoryError("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw OOME"); + } out.write(record.getUpdateType().ordinal()); out.writeUTF(record.getId()); @@ -100,6 +104,10 @@ public class DummyRecordSerde implements SerDe { this.throwIOEAfterNserializeEdits = n; } + public void setThrowOOMEAfterNSerializeEdits(final int n) { + this.throwOOMEAfterNserializeEdits = n; + } + @Override public String getLocation(final DummyRecord record) { return null; diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java index 57f3495b70..29d2e7f4bc 100644 --- a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java +++ b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java @@ -28,16 +28,180 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestMinimalLockingWriteAheadLog { + private static final Logger logger = LoggerFactory.getLogger(TestMinimalLockingWriteAheadLog.class); + + + @Test + public void testRepoDoesntContinuallyGrowOnOutOfMemoryError() throws IOException, InterruptedException { + final int numPartitions = 8; + + final Path path = Paths.get("target/minimal-locking-repo"); + deleteRecursively(path.toFile()); + assertTrue(path.toFile().mkdirs()); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final WriteAheadRepository repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serde, null); + try { + final Collection initialRecs = repo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + serde.setThrowOOMEAfterNSerializeEdits(100); + for (int i = 0; i < 108; i++) { + try { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + repo.update(Collections.singleton(record), false); + } catch (final OutOfMemoryError oome) { + logger.info("Received OOME on record " + i); + } + } + + long expectedSize = sizeOf(path.toFile()); + for (int i = 0; i < 1000; i++) { + try { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + repo.update(Collections.singleton(record), false); + Assert.fail("Expected IOE but it didn't happen"); + } catch (final IOException ioe) { + // will get IOException because all Partitions have been blacklisted + } + } + + long newSize = sizeOf(path.toFile()); + assertEquals(expectedSize, newSize); + + try { + repo.checkpoint(); + Assert.fail("Expected OOME but it didn't happen"); + } catch (final OutOfMemoryError oome) { + } + + expectedSize = sizeOf(path.toFile()); + + for (int i = 0; i < 100000; i++) { + try { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + repo.update(Collections.singleton(record), false); + Assert.fail("Expected IOE but it didn't happen"); + } catch (final IOException ioe) { + // will get IOException because all Partitions have been blacklisted + } + } + + newSize = sizeOf(path.toFile()); + assertEquals(expectedSize, newSize); + } finally { + repo.shutdown(); + } + } + + /** + * This test is intended to continually update the Write-ahead log using many threads, then + * stop and restore the repository to check for any corruption. There were reports of potential threading + * issues leading to repository corruption. This was an attempt to replicate. It should not be run as a + * unit test, really, but will be left, as it can be valuable to exercise the implementation + * + * @throws IOException if unable to read from/write to the write-ahead log + * @throws InterruptedException if a thread is interrupted + */ + @Test + @Ignore + public void tryToCauseThreadingIssue() throws IOException, InterruptedException { + System.setProperty("org.slf4j.simpleLogger.log.org.wali", "INFO"); + + final int numThreads = 12; + final long iterationsPerThread = 1000000; + final int numAttempts = 1000; + + final Path path = Paths.get("D:/dummy/minimal-locking-repo"); + path.toFile().mkdirs(); + + final AtomicReference> writeRepoRef = new AtomicReference<>(); + final AtomicBoolean checkpointing = new AtomicBoolean(false); + + final Thread bgThread = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + checkpointing.set(true); + + final WriteAheadRepository repo = writeRepoRef.get(); + if (repo != null) { + try { + repo.checkpoint(); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + } + + checkpointing.set(false); + + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + } + } + } + }); + bgThread.setDaemon(true); + bgThread.start(); + + for (int x = 0; x < numAttempts; x++) { + final DummyRecordSerde serde = new DummyRecordSerde(); + final WriteAheadRepository writeRepo = new MinimalLockingWriteAheadLog<>(path, 256, serde, null); + final Collection writeRecords = writeRepo.recoverRecords(); + for (final DummyRecord record : writeRecords) { + assertEquals("B", record.getProperty("A")); + } + + writeRepoRef.set(writeRepo); + + final Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < numThreads; i++) { + final Thread t = new InlineCreationInsertThread(iterationsPerThread, writeRepo); + t.start(); + threads[i] = t; + } + + for (final Thread t : threads) { + t.join(); + } + + writeRepoRef.set(null); + writeRepo.shutdown(); + + boolean cp = checkpointing.get(); + while (cp) { + Thread.sleep(100L); + cp = checkpointing.get(); + } + + final WriteAheadRepository readRepo = new MinimalLockingWriteAheadLog<>(path, 256, serde, null); + // ensure that we are able to recover the records properly + final Collection readRecords = readRepo.recoverRecords(); + for (final DummyRecord record : readRecords) { + assertEquals("B", record.getProperty("A")); + } + readRepo.shutdown(); + } + } @Test public void testWrite() throws IOException, InterruptedException { @@ -285,6 +449,40 @@ public class TestMinimalLockingWriteAheadLog { } } + + private static class InlineCreationInsertThread extends Thread { + private final long iterations; + private final WriteAheadRepository repo; + + public InlineCreationInsertThread(final long numInsertions, final WriteAheadRepository repo) { + this.iterations = numInsertions; + this.repo = repo; + } + + @Override + public void run() { + final List list = new ArrayList<>(1); + list.add(null); + final UpdateType[] updateTypes = new UpdateType[] { UpdateType.CREATE, UpdateType.DELETE, UpdateType.UPDATE }; + final Random random = new Random(); + + for (long i = 0; i < iterations; i++) { + final int updateTypeIndex = random.nextInt(updateTypes.length); + final UpdateType updateType = updateTypes[updateTypeIndex]; + + final DummyRecord record = new DummyRecord(String.valueOf(i), updateType); + record.setProperty("A", "B"); + list.set(0, record); + + try { + repo.update(list, false); + } catch (final Throwable t) { + t.printStackTrace(); + } + } + } + } + private void deleteRecursively(final File file) { final File[] children = file.listFiles(); if (children != null) { @@ -295,4 +493,21 @@ public class TestMinimalLockingWriteAheadLog { file.delete(); } + + private long sizeOf(final File file) { + long size = 0L; + if (file.isDirectory()) { + final File[] children = file.listFiles(); + if (children != null) { + for (final File child : children) { + size += sizeOf(child); + } + } + } + + size += file.length(); + + return size; + } + }