From 3d4ce34529533ff801a2febae7e669248cb80324 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 14 Sep 2015 11:47:02 -0400 Subject: [PATCH] NIFI-938: If ResourceClaim is removed while a process has access to its stream, don't delete the claim --- .../repository/FileSystemRepository.java | 11 ++++++++- .../repository/TestFileSystemRepository.java | 23 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index d06b462928..724e26e615 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -108,6 +108,7 @@ public class FileSystemRepository implements ContentRepository { // the OutputStream that we can use for writing to the claim. private final BlockingQueue writableClaimQueue = new LinkedBlockingQueue<>(100); private final ConcurrentMap writableClaimStreams = new ConcurrentHashMap<>(100); + private final Set activeResourceClaims = Collections.synchronizedSet(new HashSet()); private final boolean archiveData; private final long maxArchiveMillis; @@ -600,11 +601,15 @@ public class FileSystemRepository implements ContentRepository { // two conditions can be checked atomically. synchronized (writableClaimQueue) { final int claimantCount = resourceClaimManager.getClaimantCount(claim); - if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) { + if (claimantCount > 0) { // if other content claims are claiming the same resource, we have nothing to destroy, // so just consider the destruction successful. return true; } + if (activeResourceClaims.contains(claim) || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) { + // If we have an open OutputStream for the claim, we will not destroy the claim. + return false; + } } Path path = null; @@ -827,6 +832,8 @@ public class FileSystemRepository implements ContentRepository { throw new IllegalArgumentException("Cannot write to " + claim + " because it has already been written to."); } + final ResourceClaim resourceClaim = claim.getResourceClaim(); + // we always append because there may be another ContentClaim using the same resource claim. // However, we know that we will never write to the same claim from two different threads // at the same time because we will call create() to get the claim before we write to it, @@ -847,6 +854,7 @@ public class FileSystemRepository implements ContentRepository { } } + activeResourceClaims.add(resourceClaim); final ByteCountingOutputStream bcos = claimStream; final OutputStream out = new OutputStream() { private long bytesWritten = 0L; @@ -921,6 +929,7 @@ public class FileSystemRepository implements ContentRepository { @Override public synchronized void close() throws IOException { closed = true; + activeResourceClaims.remove(resourceClaim); if (alwaysSync) { ((FileOutputStream) bcos.getWrappedStream()).getFD().sync(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index 5ffcb3de60..88f572bc6e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -347,6 +347,29 @@ public class TestFileSystemRepository { assertTrue(Arrays.equals(data, Files.readAllBytes(path))); } + @Test + public void testRemoveWhileWritingToClaim() throws IOException { + final ContentClaim claim = repository.create(false); + final OutputStream out = repository.write(claim); + + // write at least 1 MB to the output stream so that when we close the output stream + // the repo won't keep the stream open. + final byte[] buff = new byte[1024 * 1024]; + out.write(buff); + out.write(buff); + + // true because claimant count is still 1. + assertTrue(repository.remove(claim)); + + assertEquals(0, repository.decrementClaimantCount(claim)); + + // false because claimant count is 0 but there is an 'active' stream for the claim + assertFalse(repository.remove(claim)); + + out.close(); + assertTrue(repository.remove(claim)); + } + @Test public void testMergeWithHeaderFooterDemarcator() throws IOException { testMerge("HEADER", "FOOTER", "DEMARCATOR");