From 5e2559db427af7306128e18414f5026ae35cf517 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 22 Apr 2019 10:17:16 -0400 Subject: [PATCH] NIFI-6236: Update VolatileFlowFileRepository to decrement claimant counts when FlowFiles are deleted This closes #3451. Signed-off-by: Bryan Bende --- .../VolatileFlowFileRepository.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java index 979a22e78a..3881f09139 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileFlowFileRepository.java @@ -24,6 +24,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; /** @@ -94,6 +95,8 @@ public class VolatileFlowFileRepository implements FlowFileRepository { @Override public void updateRepository(final Collection records) throws IOException { for (final RepositoryRecord record : records) { + updateClaimCounts(record); + if (record.getType() == RepositoryRecordType.DELETE) { // For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) { @@ -113,6 +116,29 @@ public class VolatileFlowFileRepository implements FlowFileRepository { } } + private void updateClaimCounts(final RepositoryRecord record) { + final ContentClaim currentClaim = record.getCurrentClaim(); + final ContentClaim originalClaim = record.getOriginalClaim(); + final boolean claimChanged = !Objects.equals(currentClaim, originalClaim); + + if (record.getType() == RepositoryRecordType.DELETE || record.getType() == RepositoryRecordType.CONTENTMISSING) { + decrementClaimCount(currentClaim); + } + + if (claimChanged) { + // records which have been updated - remove original if exists + decrementClaimCount(originalClaim); + } + } + + private void decrementClaimCount(final ContentClaim claim) { + if (claim == null) { + return; + } + + claimManager.decrementClaimantCount(claim.getResourceClaim()); + } + @Override public long loadFlowFiles(final QueueProvider queueProvider) throws IOException { return 0;