diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java index d9fdc97fea..df7868d588 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/LengthDelimitedJournal.java @@ -40,6 +40,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.file.Files; +import java.nio.file.Path; import java.text.DecimalFormat; import java.util.Collection; import java.util.HashMap; @@ -218,6 +219,11 @@ public class LengthDelimitedJournal implements WriteAheadJournal { } + // Visible/overrideable for testing. + protected void createOverflowDirectory(final Path path) throws IOException { + Files.createDirectories(path); + } + @Override public void update(final Collection records, final RecordLookup recordLookup) throws IOException { if (!headerWritten) { @@ -246,7 +252,7 @@ public class LengthDelimitedJournal implements WriteAheadJournal { final int size = bados.getByteArrayOutputStream().size(); if (serde.isWriteExternalFileReferenceSupported() && size > maxInHeapSerializationBytes) { if (!overflowDirectory.exists()) { - Files.createDirectory(overflowDirectory.toPath()); + createOverflowDirectory(overflowDirectory.toPath()); } // If we have exceeded our threshold for how much to serialize in memory, diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java index 94df890704..448654ef16 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/apache/nifi/wali/TestLengthDelimitedJournal.java @@ -17,16 +17,21 @@ package org.apache.nifi.wali; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.wali.DummyRecord; +import org.wali.DummyRecordSerde; +import org.wali.SerDeFactory; +import org.wali.SingletonSerDeFactory; +import org.wali.UpdateType; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -36,15 +41,14 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.wali.DummyRecord; -import org.wali.DummyRecordSerde; -import org.wali.SerDeFactory; -import org.wali.SingletonSerDeFactory; -import org.wali.UpdateType; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestLengthDelimitedJournal { private final File journalFile = new File("target/testLengthDelimitedJournal/testJournal.journal"); @@ -307,6 +311,71 @@ public class TestLengthDelimitedJournal { } } + @Test + public void testMultipleThreadsCreatingOverflowDirectory() throws IOException, InterruptedException { + final LengthDelimitedJournal journal = new LengthDelimitedJournal(journalFile, serdeFactory, streamPool, 3820L, 100) { + @Override + protected void createOverflowDirectory(final Path path) throws IOException { + // Create the overflow directory. + super.createOverflowDirectory(path); + + // Ensure that a second call to create the overflow directory will not cause an issue. + super.createOverflowDirectory(path); + } + }; + + // Ensure that the overflow directory does not exist. + journal.dispose(); + + try { + journal.writeHeader(); + + final List largeCollection1 = new ArrayList<>(); + for (int i=0; i < 1_000; i++) { + largeCollection1.add(new DummyRecord(String.valueOf(i), UpdateType.CREATE)); + } + final Map recordMap = largeCollection1.stream() + .collect(Collectors.toMap(DummyRecord::getId, rec -> rec)); + + final List largeCollection2 = new ArrayList<>(); + for (int i=0; i < 1_000; i++) { + largeCollection2.add(new DummyRecord(String.valueOf(5_000_000 + i), UpdateType.CREATE)); + } + final Map recordMap2 = largeCollection2.stream() + .collect(Collectors.toMap(DummyRecord::getId, rec -> rec)); + + final AtomicReference thread1Failure = new AtomicReference<>(); + final Thread t1 = new Thread(() -> { + try { + journal.update(largeCollection1, recordMap::get); + } catch (final Exception e) { + e.printStackTrace(); + thread1Failure.set(e); + } + }); + t1.start(); + + final AtomicReference thread2Failure = new AtomicReference<>(); + final Thread t2 = new Thread(() -> { + try { + journal.update(largeCollection2, recordMap2::get); + } catch (final Exception e) { + e.printStackTrace(); + thread2Failure.set(e); + } + }); + t2.start(); + + t1.join(); + t2.join(); + + assertNull(thread1Failure.get()); + assertNull(thread2Failure.get()); + } finally { + journal.close(); + } + } + @Test public void testTruncatedJournalFile() throws IOException { final DummyRecord firstRecord, secondRecord; diff --git a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java index 920349365b..cc8a4c1f54 100644 --- a/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java +++ b/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/DummyRecordSerde.java @@ -43,7 +43,7 @@ public class DummyRecordSerde implements SerDe { @SuppressWarnings("fallthrough") @Override - public void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException { + public synchronized void serializeEdit(final DummyRecord previousState, final DummyRecord record, final DataOutputStream out) throws IOException { if (throwIOEAfterNserializeEdits >= 0 && (serializeEditCount++ >= throwIOEAfterNserializeEdits)) { throw new IOException("Serialized " + (serializeEditCount - 1) + " records successfully, so now it's time to throw IOE"); } @@ -80,13 +80,13 @@ public class DummyRecordSerde implements SerDe { } @Override - public void serializeRecord(final DummyRecord record, final DataOutputStream out) throws IOException { + public synchronized void serializeRecord(final DummyRecord record, final DataOutputStream out) throws IOException { serializeEdit(null, record, out); } @Override @SuppressWarnings("fallthrough") - public DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { + public synchronized DummyRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { if (externalRecords != null) { final DummyRecord record = externalRecords.poll(); if (record != null) { @@ -122,7 +122,7 @@ public class DummyRecordSerde implements SerDe { } @Override - public boolean isMoreInExternalFile() { + public synchronized boolean isMoreInExternalFile() { return externalRecords != null && !externalRecords.isEmpty(); } @@ -189,11 +189,11 @@ public class DummyRecordSerde implements SerDe { return 1; } - public void setThrowIOEAfterNSerializeEdits(final int n) { + public synchronized void setThrowIOEAfterNSerializeEdits(final int n) { this.throwIOEAfterNserializeEdits = n; } - public void setThrowOOMEAfterNSerializeEdits(final int n) { + public synchronized void setThrowOOMEAfterNSerializeEdits(final int n) { this.throwOOMEAfterNserializeEdits = n; } @@ -208,7 +208,7 @@ public class DummyRecordSerde implements SerDe { } @Override - public void writeExternalFileReference(final File externalFile, final DataOutputStream out) throws IOException { + public synchronized void writeExternalFileReference(final File externalFile, final DataOutputStream out) throws IOException { out.write(EXTERNAL_FILE_INDICATOR); out.writeUTF(externalFile.getAbsolutePath()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 604eb7e8de..7a87678e41 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -359,8 +359,25 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final long updateProvenanceStart = System.nanoTime(); updateProvenanceRepo(checkpoint); - final long claimRemovalStart = System.nanoTime(); - final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart; + final long flowFileRepoUpdateStart = System.nanoTime(); + final long updateProvenanceNanos = flowFileRepoUpdateStart - updateProvenanceStart; + + // Update the FlowFile Repository + try { + final Collection repoRecords = checkpoint.records.values(); + context.getFlowFileRepository().updateRepository((Collection) repoRecords); + } catch (final IOException ioe) { + // if we fail to commit the session, we need to roll back + // the checkpoints as well because none of the checkpoints + // were ever committed. + rollback(false, true); + throw new ProcessException("FlowFile Repository failed to update", ioe); + } + + final long flowFileRepoUpdateFinishNanos = System.nanoTime(); + final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - flowFileRepoUpdateStart; + + final long claimRemovalStart = flowFileRepoUpdateFinishNanos; /** * Figure out which content claims can be released. At this point, @@ -401,25 +418,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final long claimRemovalFinishNanos = System.nanoTime(); final long claimRemovalNanos = claimRemovalFinishNanos - claimRemovalStart; - // Update the FlowFile Repository - try { - final Collection repoRecords = checkpoint.records.values(); - context.getFlowFileRepository().updateRepository((Collection) repoRecords); - } catch (final IOException ioe) { - // if we fail to commit the session, we need to roll back - // the checkpoints as well because none of the checkpoints - // were ever committed. - rollback(false, true); - throw new ProcessException("FlowFile Repository failed to update", ioe); - } - - final long flowFileRepoUpdateFinishNanos = System.nanoTime(); - final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos; - updateEventRepository(checkpoint); final long updateEventRepositoryFinishNanos = System.nanoTime(); - final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - flowFileRepoUpdateFinishNanos; + final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - claimRemovalFinishNanos; // transfer the flowfiles to the connections' queues. final Map> recordMap = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index d21cc5ce4d..b001e7921a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -92,6 +92,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.notNull; @@ -324,6 +325,60 @@ public class TestStandardProcessSession { } + @Test + public void testUpdateFlowFileRepoFailsOnSessionCommit() throws IOException { + final ContentClaim contentClaim = contentRepo.create("original".getBytes()); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .size(8L) + .contentClaim(contentClaim) + .build(); + + flowFileQueue.put(flowFileRecord); + + final Relationship relationship = new Relationship.Builder().name("A").build(); + + FlowFile ff1 = session.get(); + assertNotNull(ff1); + + // Fork a child FlowFile. + final FlowFile child = session.create(flowFileRecord); + final FlowFile updated = session.write(flowFileRecord, out -> out.write("update".getBytes())); + final ContentClaim updatedContentClaim = ((FlowFileRecord) updated).getContentClaim(); + + session.remove(updated); + final FlowFile updatedChild = session.write(child, out -> out.write("hello".getBytes(StandardCharsets.UTF_8))); + session.transfer(updatedChild, relationship); + + final ContentClaim childContentClaim = ((FlowFileRecord) updatedChild).getContentClaim(); + + flowFileRepo.setFailOnUpdate(true); + + assertEquals(1, contentRepo.getClaimantCount(contentClaim)); + + // these will be the same content claim due to how the StandardProcessSession adds multiple FlowFiles' contents to a single claim. + assertSame(updatedContentClaim, childContentClaim); + assertEquals(2, contentRepo.getClaimantCount(childContentClaim)); + + try { + session.commit(); + Assert.fail("Expected session commit to fail"); + } catch (final ProcessException pe) { + // Expected + } + + // Ensure that if we fail to update teh flowfile repo, that the claimant count of the 'original' flowfile, which was removed, does not get decremented. + assertEquals(1, contentRepo.getClaimantCount(contentClaim)); + assertEquals(0, contentRepo.getClaimantCount(updatedContentClaim)); // temporary claim should be cleaned up. + assertEquals(0, contentRepo.getClaimantCount(childContentClaim)); // temporary claim should be cleaned up. + + assertEquals(1, flowFileQueue.size().getObjectCount()); + assertEquals(8L, flowFileQueue.size().getByteCount()); + } + @Test public void testCloneOriginalDataSmaller() throws IOException { final byte[] originalContent = "hello".getBytes();