NIFI-938: If ResourceClaim is removed while a process has access to its stream, don't delete the claim

This commit is contained in:
Mark Payne 2015-09-14 11:47:02 -04:00
parent 992e841027
commit 3d4ce34529
2 changed files with 33 additions and 1 deletions

View File

@ -108,6 +108,7 @@ public class FileSystemRepository implements ContentRepository {
// the OutputStream that we can use for writing to the claim. // the OutputStream that we can use for writing to the claim.
private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100); private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100);
private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100); private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100);
private final Set<ResourceClaim> activeResourceClaims = Collections.synchronizedSet(new HashSet<ResourceClaim>());
private final boolean archiveData; private final boolean archiveData;
private final long maxArchiveMillis; private final long maxArchiveMillis;
@ -600,11 +601,15 @@ public class FileSystemRepository implements ContentRepository {
// two conditions can be checked atomically. // two conditions can be checked atomically.
synchronized (writableClaimQueue) { synchronized (writableClaimQueue) {
final int claimantCount = resourceClaimManager.getClaimantCount(claim); final int claimantCount = resourceClaimManager.getClaimantCount(claim);
if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) { if (claimantCount > 0) {
// if other content claims are claiming the same resource, we have nothing to destroy, // if other content claims are claiming the same resource, we have nothing to destroy,
// so just consider the destruction successful. // so just consider the destruction successful.
return true; return true;
} }
if (activeResourceClaims.contains(claim) || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
// If we have an open OutputStream for the claim, we will not destroy the claim.
return false;
}
} }
Path path = null; Path path = null;
@ -827,6 +832,8 @@ public class FileSystemRepository implements ContentRepository {
throw new IllegalArgumentException("Cannot write to " + claim + " because it has already been written to."); throw new IllegalArgumentException("Cannot write to " + claim + " because it has already been written to.");
} }
final ResourceClaim resourceClaim = claim.getResourceClaim();
// we always append because there may be another ContentClaim using the same resource claim. // we always append because there may be another ContentClaim using the same resource claim.
// However, we know that we will never write to the same claim from two different threads // However, we know that we will never write to the same claim from two different threads
// at the same time because we will call create() to get the claim before we write to it, // at the same time because we will call create() to get the claim before we write to it,
@ -847,6 +854,7 @@ public class FileSystemRepository implements ContentRepository {
} }
} }
activeResourceClaims.add(resourceClaim);
final ByteCountingOutputStream bcos = claimStream; final ByteCountingOutputStream bcos = claimStream;
final OutputStream out = new OutputStream() { final OutputStream out = new OutputStream() {
private long bytesWritten = 0L; private long bytesWritten = 0L;
@ -921,6 +929,7 @@ public class FileSystemRepository implements ContentRepository {
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
closed = true; closed = true;
activeResourceClaims.remove(resourceClaim);
if (alwaysSync) { if (alwaysSync) {
((FileOutputStream) bcos.getWrappedStream()).getFD().sync(); ((FileOutputStream) bcos.getWrappedStream()).getFD().sync();

View File

@ -347,6 +347,29 @@ public class TestFileSystemRepository {
assertTrue(Arrays.equals(data, Files.readAllBytes(path))); assertTrue(Arrays.equals(data, Files.readAllBytes(path)));
} }
@Test
public void testRemoveWhileWritingToClaim() throws IOException {
final ContentClaim claim = repository.create(false);
final OutputStream out = repository.write(claim);
// write at least 1 MB to the output stream so that when we close the output stream
// the repo won't keep the stream open.
final byte[] buff = new byte[1024 * 1024];
out.write(buff);
out.write(buff);
// true because claimant count is still 1.
assertTrue(repository.remove(claim));
assertEquals(0, repository.decrementClaimantCount(claim));
// false because claimant count is 0 but there is an 'active' stream for the claim
assertFalse(repository.remove(claim));
out.close();
assertTrue(repository.remove(claim));
}
@Test @Test
public void testMergeWithHeaderFooterDemarcator() throws IOException { public void testMergeWithHeaderFooterDemarcator() throws IOException {
testMerge("HEADER", "FOOTER", "DEMARCATOR"); testMerge("HEADER", "FOOTER", "DEMARCATOR");