NIFI-6236: Update VolatileFlowFileRepository to decrement claimant counts when FlowFiles are deleted

This closes #3451.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2019-04-22 10:17:16 -04:00 committed by Bryan Bende
parent 055b3cac54
commit 5e2559db42
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39

View File

@ -24,6 +24,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -94,6 +95,8 @@ public class VolatileFlowFileRepository implements FlowFileRepository {
@Override
public void updateRepository(final Collection<RepositoryRecord> records) throws IOException {
for (final RepositoryRecord record : records) {
updateClaimCounts(record);
if (record.getType() == RepositoryRecordType.DELETE) {
// For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable
if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) {
@ -113,6 +116,29 @@ public class VolatileFlowFileRepository implements FlowFileRepository {
}
}
private void updateClaimCounts(final RepositoryRecord record) {
final ContentClaim currentClaim = record.getCurrentClaim();
final ContentClaim originalClaim = record.getOriginalClaim();
final boolean claimChanged = !Objects.equals(currentClaim, originalClaim);
if (record.getType() == RepositoryRecordType.DELETE || record.getType() == RepositoryRecordType.CONTENTMISSING) {
decrementClaimCount(currentClaim);
}
if (claimChanged) {
// records which have been updated - remove original if exists
decrementClaimCount(originalClaim);
}
}
private void decrementClaimCount(final ContentClaim claim) {
if (claim == null) {
return;
}
claimManager.decrementClaimantCount(claim.getResourceClaim());
}
@Override
public long loadFlowFiles(final QueueProvider queueProvider) throws IOException {
return 0;