From 4e08ea65254edd80d487ffc17b5046c37184467c Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Sat, 23 Jul 2016 15:05:20 -0400 Subject: [PATCH] NIFI-2376 This closes #713. Ensure that we don't decrement claimant count more than once when append() throws an Exception --- .../controller/StandardFlowFileQueue.java | 2 +- .../repository/FileSystemRepository.java | 5 +- .../repository/StandardProcessSession.java | 53 ++++++-- .../repository/TestFileSystemRepository.java | 49 +++++++ .../TestStandardProcessSession.java | 120 +++++++++++++++++- 5 files changed, 212 insertions(+), 17 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index f391da5551..27bbd69ced 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -79,7 +79,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * processing. Must be thread safe. * */ -public final class StandardFlowFileQueue implements FlowFileQueue { +public class StandardFlowFileQueue implements FlowFileQueue { public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000; public static final int SWAP_RECORD_POLL_SIZE = 10000; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 673440fa6e..e350a9056f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -964,7 +964,7 @@ public class FileSystemRepository implements ContentRepository { final boolean enqueued = writableClaimQueue.offer(pair); if (enqueued) { - LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams", this); + LOG.debug("Claim length less than max; Leaving {} in writableClaimStreams map", this); } else { writableClaimStreams.remove(scc.getResourceClaim()); bcos.close(); @@ -1103,7 +1103,8 @@ public class FileSystemRepository implements ContentRepository { return Files.exists(getArchivePath(contentClaim.getResourceClaim())); } - private boolean archive(final ResourceClaim claim) throws IOException { + // visible for testing + boolean archive(final ResourceClaim claim) throws IOException { if (!archiveData) { return false; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 0a2f8c9e7a..f6cb8a1b32 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -330,13 +330,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (record.isMarkedForDelete()) { // if the working claim is not the same as the original claim, we can immediately destroy the working claim // because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync. - removeContent(record.getWorkingClaim()); + decrementClaimCount(record.getWorkingClaim()); if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) { // if working & original claim are same, don't remove twice; we only want to remove the original // if it's different from the working. Otherwise, we remove two claimant counts. This causes // an issue if we only updated the FlowFile attributes. - removeContent(record.getOriginalClaim()); + decrementClaimCount(record.getOriginalClaim()); } final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); final Connectable connectable = context.getConnectable(); @@ -344,7 +344,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife}); } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) { // records which have been updated - remove original if exists - removeContent(record.getOriginalClaim()); + decrementClaimCount(record.getOriginalClaim()); } } @@ -923,12 +923,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final Set transferRecords = new HashSet<>(); for (final StandardRepositoryRecord record : recordsToHandle) { if (record.isMarkedForAbort()) { - removeContent(record.getWorkingClaim()); + decrementClaimCount(record.getWorkingClaim()); if (record.getCurrentClaim() != null && !record.getCurrentClaim().equals(record.getWorkingClaim())) { // if working & original claim are same, don't remove twice; we only want to remove the original // if it's different from the working. Otherwise, we remove two claimant counts. This causes // an issue if we only updated the flowfile attributes. - removeContent(record.getCurrentClaim()); + decrementClaimCount(record.getCurrentClaim()); } abortedRecords.add(record); } else { @@ -1020,7 +1020,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } - private void removeContent(final ContentClaim claim) { + private void decrementClaimCount(final ContentClaim claim) { if (claim == null) { return; } @@ -1733,7 +1733,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE record.markForDelete(); expiredRecords.add(record); expiredReporter.expire(flowFile, "Expiration Threshold = " + connection.getFlowFileQueue().getFlowFileExpiration()); - removeContent(flowFile.getContentClaim()); + decrementClaimCount(flowFile.getContentClaim()); final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable; @@ -2198,9 +2198,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE originalByteWrittenCount = outStream.getBytesWritten(); // wrap our OutputStreams so that the processor cannot close it - try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream)) { + try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream); + final OutputStream flowFileAccessOutStream = new FlowFileAccessOutputStream(disableOnClose, source)) { recursionSet.add(source); - writer.process(disableOnClose); + writer.process(flowFileAccessOutStream); } finally { recursionSet.remove(source); } @@ -2210,15 +2211,37 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE newSize = outStream.getBytesWritten(); } catch (final ContentNotFoundException nfe) { resetWriteClaims(); // need to reset write claim before we can remove the claim - destroyContent(newClaim); + + // If the content claim changed, then we should destroy the new one. We do this + // because the new content claim will never get set as the 'working claim' for the FlowFile + // record since we will throw an Exception. As a result, we need to ensure that we have + // appropriately decremented the claimant count and can destroy the content if it is no + // longer in use. However, it is critical that we do this ONLY if the content claim has + // changed. Otherwise, the FlowFile already has a reference to this Content Claim and + // whenever the FlowFile is removed, the claim count will be decremented; if we decremented + // it here also, we would be decrementing the claimant count twice! + if (newClaim != oldClaim) { + destroyContent(newClaim); + } + handleContentNotFound(nfe, record); } catch (final IOException ioe) { resetWriteClaims(); // need to reset write claim before we can remove the claim - destroyContent(newClaim); - throw new FlowFileAccessException("Exception in callback: " + ioe.toString(), ioe); + + // See above explanation for why this is done only if newClaim != oldClaim + if (newClaim != oldClaim) { + destroyContent(newClaim); + } + + throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe); } catch (final Throwable t) { resetWriteClaims(); // need to reset write claim before we can remove the claim - destroyContent(newClaim); + + // See above explanation for why this is done only if newClaim != oldClaim + if (newClaim != oldClaim) { + destroyContent(newClaim); + } + throw t; } finally { if (outStream != null) { @@ -2227,6 +2250,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } + // If the record already has a working claim, and this is the first time that we are appending to the FlowFile, + // destroy the current working claim because it is a temporary claim that + // is no longer going to be used, as we are about to set a new working claim. This would happen, for instance, if + // the FlowFile was written to, via #write() and then append() was called. if (newClaim != oldClaim) { removeTemporaryClaim(record); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index c00b91ada8..b36a5e67be 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -495,6 +495,55 @@ public class TestFileSystemRepository { } + @Test + public void testWriteCannotProvideNullOutput() throws IOException { + FileSystemRepository repository = null; + try { + final List archivedPathsWithOpenStream = Collections.synchronizedList(new ArrayList()); + + // We are creating our own 'local' repository in this test so shut down the one created in the setup() method + shutdown(); + + repository = new FileSystemRepository() { + @Override + protected boolean archive(Path curPath) throws IOException { + if (getOpenStreamCount() > 0) { + archivedPathsWithOpenStream.add(curPath); + } + + return true; + } + }; + + final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); + repository.initialize(claimManager); + repository.purge(); + + final ContentClaim claim = repository.create(false); + + assertEquals(1, claimManager.getClaimantCount(claim.getResourceClaim())); + + int claimantCount = claimManager.decrementClaimantCount(claim.getResourceClaim()); + assertEquals(0, claimantCount); + assertTrue(archivedPathsWithOpenStream.isEmpty()); + + OutputStream out = repository.write(claim); + out.close(); + repository.decrementClaimantCount(claim); + + ContentClaim claim2 = repository.create(false); + assertEquals(claim.getResourceClaim(), claim2.getResourceClaim()); + out = repository.write(claim2); + + final boolean archived = repository.archive(claim.getResourceClaim()); + assertFalse(archived); + } finally { + if (repository != null) { + repository.shutdown(); + } + } + } + /** * We have encountered a situation where the File System Repo is moving files to archive and then eventually * aging them off while there is still an open file handle. This test is meant to replicate the conditions under diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index c1bcacd98a..2f3bff5216 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -68,6 +68,7 @@ import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.MissingFlowFileException; @@ -144,7 +145,8 @@ public class TestStandardProcessSession { final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class); final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class); - flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000); + final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000); + flowFileQueue = Mockito.spy(actualQueue); when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); Mockito.doAnswer(new Answer() { @@ -206,6 +208,71 @@ public class TestStandardProcessSession { session = new StandardProcessSession(context); } + @Test + public void testAppendToChildThrowsIOExceptionThenRemove() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile original = session.get(); + assertNotNull(original); + + FlowFile child = session.create(original); + child = session.append(child, out -> out.write("hello".getBytes())); + + // Force an IOException. This will decrement out claim count for the resource claim. + try { + child = session.append(child, out -> { + throw new IOException(); + }); + Assert.fail("append() callback threw IOException but it was not wrapped in ProcessException"); + } catch (final ProcessException pe) { + // expected + } + + session.remove(child); + session.transfer(original); + session.commit(); + + final int numClaims = contentRepo.getExistingClaims().size(); + assertEquals(0, numClaims); + } + + @Test + public void testWriteForChildThrowsIOExceptionThenRemove() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile original = session.get(); + assertNotNull(original); + + FlowFile child = session.create(original); + // Force an IOException. This will decrement out claim count for the resource claim. + try { + child = session.write(child, out -> out.write("hello".getBytes())); + + child = session.write(child, out -> { + throw new IOException(); + }); + Assert.fail("write() callback threw IOException but it was not wrapped in ProcessException"); + } catch (final ProcessException pe) { + // expected + } + + session.remove(child); + session.transfer(original); + session.commit(); + + final int numClaims = contentRepo.getExistingClaims().size(); + assertEquals(0, numClaims); + } + + @Test public void testModifyContentThenRollback() throws IOException { assertEquals(0, contentRepo.getExistingClaims().size()); @@ -806,6 +873,57 @@ public class TestStandardProcessSession { assertEquals("Hello, World", new String(buff)); } + @Test + public void testAppendDoesNotDecrementContentClaimIfNotNeeded() { + FlowFile flowFile = session.create(); + + session.append(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write("hello".getBytes()); + } + }); + + final Set existingClaims = contentRepo.getExistingClaims(); + assertEquals(1, existingClaims.size()); + final ContentClaim claim = existingClaims.iterator().next(); + + final int countAfterAppend = contentRepo.getClaimantCount(claim); + assertEquals(1, countAfterAppend); + } + + + @Test + @SuppressWarnings("unchecked") + public void testExpireDecrementsClaimsOnce() throws IOException { + final ContentClaim contentClaim = contentRepo.create(false); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(contentClaim) + .build(); + + Mockito.doAnswer(new Answer>() { + int iterations = 0; + + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + if (iterations++ == 0) { + final Set expired = invocation.getArgumentAt(1, Set.class); + expired.add(flowFileRecord); + } + + return null; + } + }).when(flowFileQueue).poll(Mockito.any(FlowFileFilter.class), Mockito.any(Set.class)); + + session.expireFlowFiles(); + session.commit(); // if the content claim count is decremented to less than 0, an exception will be thrown. + + assertEquals(1L, contentRepo.getClaimsRemoved()); + } + @Test public void testManyFilesOpened() throws IOException {