From 58a25cfa5a9b9e4af9a5e9b7e5c3be9a676d8e87 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 8 Mar 2019 08:55:53 -0500 Subject: [PATCH] NIFI-6110: Updated StandardProcessSession such that if we fail to update the FlowFile Repository, we do not decrement claimant counts for any FlowFiles that were removed. Doing so can cause an issue where a FlowFile is removed, then the FlowFileRepo update fails, resulting in the flowfile being rolled back, but after its claimant count is decremented. It will then be processed again, which can result in the same thing, and we'll end up decrementing the claimant count repeatedly. Also updated LengthDelimitedJournal so that if the overflow directory already exists, it does not fail when trying to create the directory and instead just moves on. Updated unit tests to test both of these new fixes and updated DummyRecordSerde to be thread-safe because the TestLengthDelimitedJournal now needs it to be thread safe due to the new unit test that was added. This closes #3358. Signed-off-by: Bryan Bende --- .../nifi/wali/LengthDelimitedJournal.java | 8 +- .../nifi/wali/TestLengthDelimitedJournal.java | 93 ++++++++++++++++--- .../test/java/org/wali/DummyRecordSerde.java | 14 +-- .../repository/StandardProcessSession.java | 38 ++++---- .../TestStandardProcessSession.java | 55 +++++++++++ 5 files changed, 170 insertions(+), 38 deletions(-) diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java index d9fdc97fea..df7868d588 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java @@ -40,6 +40,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.file.Files; +import java.nio.file.Path; import java.text.DecimalFormat; import java.util.Collection; import java.util.HashMap; @@ -218,6 +219,11 @@ public class LengthDelimitedJournal implements WriteAheadJournal { } + // Visible/overrideable for testing. + protected void createOverflowDirectory(final Path path) throws IOException { + Files.createDirectories(path); + } + @Override public void update(final Collection records, final RecordLookup recordLookup) throws IOException { if (!headerWritten) { @@ -246,7 +252,7 @@ public class LengthDelimitedJournal implements WriteAheadJournal { final int size = bados.getByteArrayOutputStream().size(); if (serde.isWriteExternalFileReferenceSupported() && size > maxInHeapSerializationBytes) { if (!overflowDirectory.exists()) { - Files.createDirectory(overflowDirectory.toPath()); + createOverflowDirectory(overflowDirectory.toPath()); } // If we have exceeded our threshold for how much to serialize in memory, diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java index 94df890704..448654ef16 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java @@ -17,16 +17,21 @@ package org.apache.nifi.wali; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.wali.DummyRecord; +import org.wali.DummyRecordSerde; +import org.wali.SerDeFactory; +import org.wali.SingletonSerDeFactory; +import org.wali.UpdateType; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -36,15 +41,14 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.wali.DummyRecord; -import org.wali.DummyRecordSerde; -import org.wali.SerDeFactory; -import org.wali.SingletonSerDeFactory; -import org.wali.UpdateType; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestLengthDelimitedJournal { private final File journalFile = new File("target/testLengthDelimitedJournal/testJournal.journal"); @@ -307,6 +311,71 @@ public class TestLengthDelimitedJournal { } } + @Test + public void testMultipleThreadsCreatingOverflowDirectory() throws IOException, InterruptedException { + final LengthDelimitedJournal journal = new LengthDelimitedJournal(journalFile, serdeFactory, streamPool, 3820L, 100) { + @Override + protected void createOverflowDirectory(final Path path) throws IOException { + // Create the overflow directory. + super.createOverflowDirectory(path); + + // Ensure that a second call to create the overflow directory will not cause an issue. + super.createOverflowDirectory(path); + } + }; + + // Ensure that the overflow directory does not exist. + journal.dispose(); + + try { + journal.writeHeader(); + + final List largeCollection1 = new ArrayList<>(); + for (int i=0; i < 1_000; i++) { + largeCollection1.add(new DummyRecord(String.valueOf(i), UpdateType.CREATE)); + } + final Map recordMap = largeCollection1.stream() + .collect(Collectors.toMap(DummyRecord::getId, rec -> rec)); + + final List largeCollection2 = new ArrayList<>(); + for (int i=0; i < 1_000; i++) { + largeCollection2.add(new DummyRecord(String.valueOf(5_000_000 + i), UpdateType.CREATE)); + } + final Map recordMap2 = largeCollection2.stream() + .collect(Collectors.toMap(DummyRecord::getId, rec -> rec)); + + final AtomicReference thread1Failure = new AtomicReference<>(); + final Thread t1 = new Thread(() -> { + try { + journal.update(largeCollection1, recordMap::get); + } catch (final Exception e) { + e.printStackTrace(); + thread1Failure.set(e); + } + }); + t1.start(); + + final AtomicReference thread2Failure = new AtomicReference<>(); + final Thread t2 = new Thread(() -> { + try { + journal.update(largeCollection2, recordMap2::get); + } catch (final Exception e) { + e.printStackTrace(); + thread2Failure.set(e); + } + }); + t2.start(); + + t1.join(); + t2.join(); + + assertNull(thread1Failure.get()); + assertNull(thread2Failure.get()); + } finally { + journal.close(); + } + } + @Test public void testTruncatedJournalFile() throws IOException { final DummyRecord firstRecord, secondRecord; diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java index 920349365b..cc8a4c1f54 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java @@ -43,7 +43,7 @@ public class DummyRecordSerde implements SerDe { @SuppressWarnings("fallthrough") @Override - public void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException { + public synchronized void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException { if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwIOEAfterNserializeEdits)) { throw new IOException("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw IOE"); } @@ -80,13 +80,13 @@ public class DummyRecordSerde implements SerDe { } @Override - public void serializeRecord(final DummyRecord record, final DataOutputStream out) throws IOException { + public synchronized void serializeRecord(final DummyRecord record, final DataOutputStream out) throws IOException { serializeEdit(null, record, out); } @Override @SuppressWarnings("fallthrough") - public DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { + public synchronized DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { if (externalRecords != null) { final DummyRecord record = externalRecords.poll(); if (record != null) { @@ -122,7 +122,7 @@ public class DummyRecordSerde implements SerDe { } @Override - public boolean isMoreInExternalFile() { + public synchronized boolean isMoreInExternalFile() { return externalRecords != null && !externalRecords.isEmpty(); } @@ -189,11 +189,11 @@ public class DummyRecordSerde implements SerDe { return 1; } - public void setThrowIOEAfterNSerializeEdits(final int n) { + public synchronized void setThrowIOEAfterNSerializeEdits(final int n) { this.throwIOEAfterNserializeEdits = n; } - public void setThrowOOMEAfterNSerializeEdits(final int n) { + public synchronized void setThrowOOMEAfterNSerializeEdits(final int n) { this.throwOOMEAfterNserializeEdits = n; } @@ -208,7 +208,7 @@ public class DummyRecordSerde implements SerDe { } @Override - public void writeExternalFileReference(final File externalFile, final DataOutputStream out) throws IOException { + public synchronized void writeExternalFileReference(final File externalFile, final DataOutputStream out) throws IOException { out.write(EXTERNAL_FILE_INDICATOR); out.writeUTF(externalFile.getAbsolutePath()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 604eb7e8de..7a87678e41 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -359,8 +359,25 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final long updateProvenanceStart = System.nanoTime(); updateProvenanceRepo(checkpoint); - final long claimRemovalStart = System.nanoTime(); - final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart; + final long flowFileRepoUpdateStart = System.nanoTime(); + final long updateProvenanceNanos = flowFileRepoUpdateStart - updateProvenanceStart; + + // Update the FlowFile Repository + try { + final Collection repoRecords = checkpoint.records.values(); + context.getFlowFileRepository().updateRepository((Collection) repoRecords); + } catch (final IOException ioe) { + // if we fail to commit the session, we need to roll back + // the checkpoints as well because none of the checkpoints + // were ever committed. + rollback(false, true); + throw new ProcessException("FlowFile Repository failed to update", ioe); + } + + final long flowFileRepoUpdateFinishNanos = System.nanoTime(); + final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - flowFileRepoUpdateStart; + + final long claimRemovalStart = flowFileRepoUpdateFinishNanos; /** * Figure out which content claims can be released. At this point, @@ -401,25 +418,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final long claimRemovalFinishNanos = System.nanoTime(); final long claimRemovalNanos = claimRemovalFinishNanos - claimRemovalStart; - // Update the FlowFile Repository - try { - final Collection repoRecords = checkpoint.records.values(); - context.getFlowFileRepository().updateRepository((Collection) repoRecords); - } catch (final IOException ioe) { - // if we fail to commit the session, we need to roll back - // the checkpoints as well because none of the checkpoints - // were ever committed. - rollback(false, true); - throw new ProcessException("FlowFile Repository failed to update", ioe); - } - - final long flowFileRepoUpdateFinishNanos = System.nanoTime(); - final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos; - updateEventRepository(checkpoint); final long updateEventRepositoryFinishNanos = System.nanoTime(); - final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - flowFileRepoUpdateFinishNanos; + final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - claimRemovalFinishNanos; // transfer the flowfiles to the connections' queues. final Map> recordMap = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index d21cc5ce4d..b001e7921a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -92,6 +92,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.notNull; @@ -324,6 +325,60 @@ public class TestStandardProcessSession { } + @Test + public void testUpdateFlowFileRepoFailsOnSessionCommit() throws IOException { + final ContentClaim contentClaim = contentRepo.create("original".getBytes()); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .size(8L) + .contentClaim(contentClaim) + .build(); + + flowFileQueue.put(flowFileRecord); + + final Relationship relationship = new Relationship.Builder().name("A").build(); + + FlowFile ff1 = session.get(); + assertNotNull(ff1); + + // Fork a child FlowFile. + final FlowFile child = session.create(flowFileRecord); + final FlowFile updated = session.write(flowFileRecord, out -> out.write("update".getBytes())); + final ContentClaim updatedContentClaim = ((FlowFileRecord) updated).getContentClaim(); + + session.remove(updated); + final FlowFile updatedChild = session.write(child, out -> out.write("hello".getBytes(StandardCharsets.UTF_8))); + session.transfer(updatedChild, relationship); + + final ContentClaim childContentClaim = ((FlowFileRecord) updatedChild).getContentClaim(); + + flowFileRepo.setFailOnUpdate(true); + + assertEquals(1, contentRepo.getClaimantCount(contentClaim)); + + // these will be the same content claim due to how the StandardProcessSession adds multiple FlowFiles' contents to a single claim. + assertSame(updatedContentClaim, childContentClaim); + assertEquals(2, contentRepo.getClaimantCount(childContentClaim)); + + try { + session.commit(); + Assert.fail("Expected session commit to fail"); + } catch (final ProcessException pe) { + // Expected + } + + // Ensure that if we fail to update teh flowfile repo, that the claimant count of the 'original' flowfile, which was removed, does not get decremented. + assertEquals(1, contentRepo.getClaimantCount(contentClaim)); + assertEquals(0, contentRepo.getClaimantCount(updatedContentClaim)); // temporary claim should be cleaned up. + assertEquals(0, contentRepo.getClaimantCount(childContentClaim)); // temporary claim should be cleaned up. + + assertEquals(1, flowFileQueue.size().getObjectCount()); + assertEquals(8L, flowFileQueue.size().getByteCount()); + } + @Test public void testCloneOriginalDataSmaller() throws IOException { final byte[] originalContent = "hello".getBytes();