mirror of https://github.com/apache/nifi.git
NIFI-3036: When we replay a FlowFile, ensure that we are using the 'golden copy' of the associated Resource Claim, if the claim is still writable. Ensure that StandardResourceClaimManager retains the 'golden copy' of a Resource Claim until it is no longer writable and has a claim count of 0
This closes #1223 Signed-off-by: jpercivall <JPercivall@apache.org>
This commit is contained in:
parent
81357d4456
commit
721964b7d8
|
@ -3795,9 +3795,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
throw new IllegalStateException("Cannot replay data from Provenance Event because the Source FlowFile Queue with ID " + event.getSourceQueueIdentifier() + " no longer exists");
|
||||
}
|
||||
|
||||
// Create the ContentClaim
|
||||
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
|
||||
event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false, false);
|
||||
// Create the ContentClaim. To do so, we first need the appropriate Resource Claim. Because we don't know whether or
|
||||
// not the Resource Claim is still active, we first call ResourceClaimManager.getResourceClaim. If this returns
|
||||
// null, then we know that the Resource Claim is no longer active and can just create a new one that is not writable.
|
||||
// It's critical though that we first call getResourceClaim because otherwise, if the Resource Claim is active and we
|
||||
// create a new one that is not writable, we could end up archiving or destroying the Resource Claim while it's still
|
||||
// being written to by the Content Repository. This is important only because we are creating a FlowFile with this Resource
|
||||
// Claim. If, for instance, we are simply creating the claim to request its content, as in #getContentAvailability, etc.
|
||||
// then this is not necessary.
|
||||
ResourceClaim resourceClaim = resourceClaimManager.getResourceClaim(event.getPreviousContentClaimContainer(),
|
||||
event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier());
|
||||
if (resourceClaim == null) {
|
||||
resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
|
||||
event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false, false);
|
||||
}
|
||||
|
||||
// Increment Claimant Count, since we will now be referencing the Content Claim
|
||||
resourceClaimManager.incrementClaimantCount(resourceClaim);
|
||||
|
|
|
@ -943,7 +943,15 @@ public class FileSystemRepository implements ContentRepository {
|
|||
final long resourceClaimLength = scc.getOffset() + scc.getLength();
|
||||
if (recycle && resourceClaimLength < MAX_APPENDABLE_CLAIM_LENGTH) {
|
||||
final ClaimLengthPair pair = new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength);
|
||||
final boolean enqueued = writableClaimQueue.offer(pair);
|
||||
|
||||
// We are checking that writableClaimStreams contains the resource claim as a key, as a sanity check.
|
||||
// It should always be there. However, we have encountered a bug before where we archived content before
|
||||
// we should have. As a result, the Resource Claim and the associated OutputStream were removed from the
|
||||
// writableClaimStreams map, and this caused a NullPointerException. Worse, the call here to
|
||||
// writableClaimQueue.offer() means that the ResourceClaim was then reused, which resulted in an endless
|
||||
// loop of NullPointerException's being thrown. As a result, we simply ensure that the Resource Claim does
|
||||
// in fact have an OutputStream associated with it before adding it back to the writableClaimQueue.
|
||||
final boolean enqueued = writableClaimStreams.get(scc.getResourceClaim()) != null && writableClaimQueue.offer(pair);
|
||||
|
||||
if (enqueued) {
|
||||
LOG.debug("Claim length less than max; Adding {} back to Writable Claim Queue", this);
|
||||
|
|
|
@ -97,7 +97,10 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
|
|||
logger.debug("Decrementing claimant count for {} to {}", claim, newClaimantCount);
|
||||
}
|
||||
|
||||
if (newClaimantCount == 0) {
|
||||
// If the claim is no longer referenced, we want to remove it. We consider the claim to be "no longer referenced"
|
||||
// if the count is 0 and it is no longer writable (if it's writable, it may still be writable by the Content Repository,
|
||||
// even though no existing FlowFile is referencing the claim).
|
||||
if (newClaimantCount == 0 && !claim.isWritable()) {
|
||||
removeClaimantCount(claim);
|
||||
}
|
||||
return newClaimantCount;
|
||||
|
@ -188,6 +191,12 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
|
|||
}
|
||||
|
||||
((StandardResourceClaim) claim).freeze();
|
||||
|
||||
synchronized (claim) {
|
||||
if (getClaimantCount(claim) == 0) {
|
||||
claimantCounts.remove(claim);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue