diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java index 09202c0354..3b27d959aa 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.controller.repository; +import java.util.List; + import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.repository.claim.ContentClaim; @@ -79,4 +81,10 @@ public interface RepositoryRecord { * swapped out */ String getSwapLocation(); + + /** + * @return a List of Content Claims that are "transient," meaning that they existed only for the + * life of the Process Session in which they were created and should not be persisted. + */ + List getTransientClaims(); } diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java index 77e7f8415c..bb788963b7 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java @@ -51,4 +51,17 @@ public interface ResourceClaim extends Comparable { */ boolean isLossTolerant(); + /** + * @return true if the Resource Claim may still be written to, false if the Resource Claim + * will no longer be written to + */ + boolean isWritable(); + + /** + * Indicates whether or not the Resource Claim is in use. A Resource Claim is said to be in use if either it is + * writable or at least one Content Claim still refers to the it + * + * @return true if the Resource Claim is in use, false otherwise + */ + boolean isInUse(); } diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java index 01f4c65af5..b430df0a7c 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java @@ -132,4 +132,11 @@ public interface ResourceClaimManager { * about */ void purge(); + + /** + * Freezes the Resource Claim so that it can no longer be written to + * + * @param claim the resource claim to freeze + */ + void freeze(ResourceClaim claim); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 2635fc472b..77c3dd7b5f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -125,7 +125,6 @@ import org.apache.nifi.controller.repository.claim.ContentDirection; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardContentClaim; -import org.apache.nifi.controller.repository.claim.StandardResourceClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.io.LimitedInputStream; import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent; @@ -3532,7 +3531,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return null; } - final StandardResourceClaim resourceClaim = new StandardResourceClaim(container, section, identifier, false); + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, false); return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset.longValue()); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 27bbd69ced..77f82d50ca 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -1300,6 +1300,11 @@ public class StandardFlowFileQueue implements FlowFileQueue { public String getSwapLocation() { return null; } + + @Override + public List getTransientClaims() { + return Collections.emptyList(); + } }; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index e350a9056f..8cf2401a57 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -102,7 +102,7 @@ public class FileSystemRepository implements ContentRepository { // with creating and deleting too many files, as we had to delete 100's of thousands of files every 2 minutes // in order to avoid backpressure on session commits. With 1 MB as the target file size, 100's of thousands of // files would mean that we are writing gigabytes per second - quite a bit faster than any disks can handle now. - private final long maxAppendClaimLength = 1024L * 1024L; + static final int MAX_APPENDABLE_CLAIM_LENGTH = 1024 * 1024; // Queue for claims that are kept open for writing. Size of 100 is pretty arbitrary. Ideally, this will be at // least as large as the number of threads that will be updating the repository simultaneously but we don't want @@ -111,7 +111,6 @@ public class FileSystemRepository implements ContentRepository { // the OutputStream that we can use for writing to the claim. private final BlockingQueue writableClaimQueue = new LinkedBlockingQueue<>(100); private final ConcurrentMap writableClaimStreams = new ConcurrentHashMap<>(100); - private final Set activeResourceClaims = Collections.synchronizedSet(new HashSet()); private final boolean archiveData; private final long maxArchiveMillis; @@ -497,64 +496,53 @@ public class FileSystemRepository implements ContentRepository { public ContentClaim create(final boolean lossTolerant) throws IOException { ResourceClaim resourceClaim; - // We need to synchronize on this queue because the act of pulling something off - // the queue and incrementing the associated claimant count MUST be done atomically. - // This way, if the claimant count is decremented to 0, we can ensure that the - // claim is not then pulled from the queue and used as another thread is destroying/archiving - // the claim. The logic in the remove() method dictates that the underlying file can be - // deleted (or archived) only if the claimant count becomes <= 0 AND there is no other claim on - // the queue that references that file. As a result, we need to ensure that those two conditions - // can be evaluated atomically. In order for that to be the case, we need to also treat the - // removal of a claim from the queue and the incrementing of its claimant count as an atomic - // action to ensure that the comparison of those two conditions is atomic also. As a result, - // we will synchronize on the queue while performing those actions. final long resourceOffset; - synchronized (writableClaimQueue) { - final ClaimLengthPair pair = writableClaimQueue.poll(); - if (pair == null) { - final long currentIndex = index.incrementAndGet(); + final ClaimLengthPair pair = writableClaimQueue.poll(); + if (pair == null) { + final long currentIndex = index.incrementAndGet(); - String containerName = null; - boolean waitRequired = true; - ContainerState containerState = null; - for (long containerIndex = currentIndex; containerIndex < currentIndex + containers.size(); containerIndex++) { - final long modulatedContainerIndex = containerIndex % containers.size(); - containerName = containerNames.get((int) modulatedContainerIndex); + String containerName = null; + boolean waitRequired = true; + ContainerState containerState = null; + for (long containerIndex = currentIndex; containerIndex < currentIndex + containers.size(); containerIndex++) { + final long modulatedContainerIndex = containerIndex % containers.size(); + containerName = containerNames.get((int) modulatedContainerIndex); - containerState = containerStateMap.get(containerName); - if (!containerState.isWaitRequired()) { - waitRequired = false; - break; - } + containerState = containerStateMap.get(containerName); + if (!containerState.isWaitRequired()) { + waitRequired = false; + break; } - - if (waitRequired) { - containerState.waitForArchiveExpiration(); - } - - final long modulatedSectionIndex = currentIndex % SECTIONS_PER_CONTAINER; - final String section = String.valueOf(modulatedSectionIndex); - final String claimId = System.currentTimeMillis() + "-" + currentIndex; - - resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant); - resourceOffset = 0L; - LOG.debug("Creating new Resource Claim {}", resourceClaim); - - // we always append because there may be another ContentClaim using the same resource claim. - // However, we know that we will never write to the same claim from two different threads - // at the same time because we will call create() to get the claim before we write to it, - // and when we call create(), it will remove it from the Queue, which means that no other - // thread will get the same Claim until we've finished writing to it. - final File file = getPath(resourceClaim).toFile(); - ByteCountingOutputStream claimStream = new SynchronizedByteCountingOutputStream(new FileOutputStream(file, true), file.length()); - writableClaimStreams.put(resourceClaim, claimStream); - } else { - resourceClaim = pair.getClaim(); - resourceOffset = pair.getLength(); - LOG.debug("Reusing Resource Claim {}", resourceClaim); } - resourceClaimManager.incrementClaimantCount(resourceClaim, true); + if (waitRequired) { + containerState.waitForArchiveExpiration(); + } + + final long modulatedSectionIndex = currentIndex % SECTIONS_PER_CONTAINER; + final String section = String.valueOf(modulatedSectionIndex); + final String claimId = System.currentTimeMillis() + "-" + currentIndex; + + resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant); + resourceOffset = 0L; + LOG.debug("Creating new Resource Claim {}", resourceClaim); + + // we always append because there may be another ContentClaim using the same resource claim. + // However, we know that we will never write to the same claim from two different threads + // at the same time because we will call create() to get the claim before we write to it, + // and when we call create(), it will remove it from the Queue, which means that no other + // thread will get the same Claim until we've finished writing to it. + final File file = getPath(resourceClaim).toFile(); + ByteCountingOutputStream claimStream = new SynchronizedByteCountingOutputStream(new FileOutputStream(file, true), file.length()); + writableClaimStreams.put(resourceClaim, claimStream); + + incrementClaimantCount(resourceClaim, true); + } else { + resourceClaim = pair.getClaim(); + resourceOffset = pair.getLength(); + LOG.debug("Reusing Resource Claim {}", resourceClaim); + + incrementClaimantCount(resourceClaim, false); } final StandardContentClaim scc = new StandardContentClaim(resourceClaim, resourceOffset); @@ -563,18 +551,24 @@ public class FileSystemRepository implements ContentRepository { @Override public int incrementClaimaintCount(final ContentClaim claim) { - if (claim == null) { + return incrementClaimantCount(claim == null ? null : claim.getResourceClaim(), false); + } + + protected int incrementClaimantCount(final ResourceClaim resourceClaim, final boolean newClaim) { + if (resourceClaim == null) { return 0; } - return resourceClaimManager.incrementClaimantCount(claim.getResourceClaim()); + return resourceClaimManager.incrementClaimantCount(resourceClaim, newClaim); } + @Override public int getClaimantCount(final ContentClaim claim) { if (claim == null) { return 0; } + return resourceClaimManager.getClaimantCount(claim.getResourceClaim()); } @@ -584,8 +578,7 @@ public class FileSystemRepository implements ContentRepository { return 0; } - final int claimantCount = resourceClaimManager.decrementClaimantCount(claim.getResourceClaim()); - return claimantCount; + return resourceClaimManager.decrementClaimantCount(claim.getResourceClaim()); } @Override @@ -602,23 +595,9 @@ public class FileSystemRepository implements ContentRepository { return false; } - // we synchronize on the queue here because if the claimant count is 0, - // we need to be able to remove any instance of that resource claim from the - // queue atomically (i.e., the checking of the claimant count plus removal from the queue - // must be atomic). The create() method also synchronizes on the queue whenever it - // polls from the queue and increments a claimant count in order to ensure that these - // two conditions can be checked atomically. - synchronized (writableClaimQueue) { - final int claimantCount = resourceClaimManager.getClaimantCount(claim); - if (claimantCount > 0) { - // if other content claims are claiming the same resource, we have nothing to destroy, - // so just consider the destruction successful. - return true; - } - if (activeResourceClaims.contains(claim) || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) { - // If we have an open OutputStream for the claim, we will not destroy the claim. - return false; - } + // If the claim is still in use, we won't remove it. + if (claim.isInUse()) { + return false; } Path path = null; @@ -629,6 +608,7 @@ public class FileSystemRepository implements ContentRepository { // Ensure that we have no writable claim streams for this resource claim final ByteCountingOutputStream bcos = writableClaimStreams.remove(claim); + if (bcos != null) { try { bcos.close(); @@ -841,6 +821,7 @@ public class FileSystemRepository implements ContentRepository { return write(claim, false); } + private OutputStream write(final ContentClaim claim, final boolean append) throws IOException { if (claim == null) { throw new NullPointerException("ContentClaim cannot be null"); @@ -857,12 +838,9 @@ public class FileSystemRepository implements ContentRepository { throw new IllegalArgumentException("Cannot write to " + claim + " because it has already been written to."); } - final ResourceClaim resourceClaim = claim.getResourceClaim(); - ByteCountingOutputStream claimStream = writableClaimStreams.get(scc.getResourceClaim()); final int initialLength = append ? (int) Math.max(0, scc.getLength()) : 0; - activeResourceClaims.add(resourceClaim); final ByteCountingOutputStream bcos = claimStream; final OutputStream out = new OutputStream() { private long bytesWritten = 0L; @@ -937,7 +915,6 @@ public class FileSystemRepository implements ContentRepository { @Override public synchronized void close() throws IOException { closed = true; - activeResourceClaims.remove(resourceClaim); if (alwaysSync) { ((FileOutputStream) bcos.getWrappedStream()).getFD().sync(); @@ -953,20 +930,15 @@ public class FileSystemRepository implements ContentRepository { // is called. In this case, we don't have to actually close the file stream. Instead, we // can just add it onto the queue and continue to use it for the next content claim. final long resourceClaimLength = scc.getOffset() + scc.getLength(); - if (recycle && resourceClaimLength < maxAppendClaimLength) { - // we do not have to synchronize on the writable claim queue here because we - // are only adding something to the queue. We must synchronize if we are - // using a ResourceClaim from the queue and incrementing the claimant count on that resource - // because those need to be done atomically, or if we are destroying a claim that is on - // the queue because we need to ensure that the latter operation does not cause problems - // with the former. + if (recycle && resourceClaimLength < MAX_APPENDABLE_CLAIM_LENGTH) { final ClaimLengthPair pair = new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength); final boolean enqueued = writableClaimQueue.offer(pair); if (enqueued) { - LOG.debug("Claim length less than max; Leaving {} in writableClaimStreams map", this); + LOG.debug("Claim length less than max; Adding {} back to Writable Claim Queue", this); } else { writableClaimStreams.remove(scc.getResourceClaim()); + bcos.close(); LOG.debug("Claim length less than max; Closing {} because could not add back to queue", this); @@ -978,8 +950,12 @@ public class FileSystemRepository implements ContentRepository { // we've reached the limit for this claim. Don't add it back to our queue. // Instead, just remove it and move on. + // Mark the claim as no longer being able to be written to + resourceClaimManager.freeze(scc.getResourceClaim()); + // ensure that the claim is no longer on the queue writableClaimQueue.remove(new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength)); + bcos.close(); LOG.debug("Claim lenth >= max; Closing {}", this); if (LOG.isTraceEnabled()) { @@ -1109,11 +1085,8 @@ public class FileSystemRepository implements ContentRepository { return false; } - synchronized (writableClaimQueue) { - final int claimantCount = claim == null ? 0 : resourceClaimManager.getClaimantCount(claim); - if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) { - return false; - } + if (claim.isInUse()) { + return false; } // If the claim count is decremented to 0 (<= 0 as a 'defensive programming' strategy), ensure that @@ -1121,6 +1094,7 @@ public class FileSystemRepository implements ContentRepository { // claimant count is removed without writing to the claim (or more specifically, without closing the // OutputStream that is returned when calling write() ). final OutputStream out = writableClaimStreams.remove(claim); + if (out != null) { try { out.close(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index f6cb8a1b32..799cab8cc2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -2278,24 +2278,26 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final boolean contentModified = record.getWorkingClaim() != null && record.getWorkingClaim() != record.getOriginalClaim(); // If the working claim is not the same as the original claim, we have modified the content of - // the FlowFile, and we need to remove the newly created file (the working claim). However, if + // the FlowFile, and we need to remove the newly created content (the working claim). However, if // they are the same, we cannot just remove the claim because record.getWorkingClaim() will return // the original claim if the record is "working" but the content has not been modified // (e.g., in the case of attributes only were updated) + // // In other words: // If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will // return the same claim as record.getOriginalClaim(). So we cannot just remove the working claim because // that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do - // because we will do that later, in the session.commit() and that would result in removing the original claim twice. + // because we will do that later, in the session.commit() and that would result in decrementing the count for + // the original claim twice. if (contentModified) { - // In this case, it's ok to go ahead and destroy the content because we know that the working claim is going to be + // In this case, it's ok to decrement the claimant count for the content because we know that the working claim is going to be // updated and the given working claim is referenced only by FlowFiles in this session (because it's the Working Claim). - // Therefore, if this is the only record that refers to that Content Claim, we can destroy the claim. This happens, - // for instance, if a Processor modifies the content of a FlowFile more than once before committing the session. - final int claimantCount = context.getContentRepository().decrementClaimantCount(record.getWorkingClaim()); - if (claimantCount == 0) { - context.getContentRepository().remove(record.getWorkingClaim()); - } + // Therefore, we need to decrement the claimant count, and since the Working Claim is being changed, that means that + // the Working Claim is a transient claim (the content need not be persisted because no FlowFile refers to it). We cannot simply + // remove the content because there may be other FlowFiles that reference the same Resource Claim. Marking the Content Claim as + // transient, though, will result in the FlowFile Repository cleaning up as appropriate. + context.getContentRepository().decrementClaimantCount(record.getWorkingClaim()); + record.addTransientClaim(record.getWorkingClaim()); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java index c5be81ef65..8aa1caf80e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java @@ -16,8 +16,10 @@ */ package org.apache.nifi.controller.repository; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.nifi.controller.queue.FlowFileQueue; @@ -35,6 +37,7 @@ public class StandardRepositoryRecord implements RepositoryRecord { private String swapLocation; private final Map updatedAttributes = new HashMap<>(); private final Map originalAttributes; + private List transientClaims; /** * Creates a new record which has no original claim or flow file - it is entirely new @@ -199,4 +202,20 @@ public class StandardRepositoryRecord implements RepositoryRecord { public String toString() { return "StandardRepositoryRecord[UpdateType=" + getType() + ",Record=" + getCurrent() + "]"; } + + @Override + public List getTransientClaims() { + return transientClaims == null ? Collections. emptyList() : Collections.unmodifiableList(transientClaims); + } + + void addTransientClaim(final ContentClaim claim) { + if (claim == null) { + return; + } + + if (transientClaims == null) { + transientClaims = new ArrayList<>(); + } + transientClaims.add(claim); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index dae1cff776..2e5f00551a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -177,17 +177,17 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis claimManager.markDestructable(resourceClaim); } - private int getClaimantCount(final ContentClaim claim) { + private boolean isDestructable(final ContentClaim claim) { if (claim == null) { - return 0; + return false; } final ResourceClaim resourceClaim = claim.getResourceClaim(); if (resourceClaim == null) { - return 0; + return false; } - return claimManager.getClaimantCount(resourceClaim); + return !resourceClaim.isInUse(); } private void updateRepository(final Collection records, final boolean sync) throws IOException { @@ -211,21 +211,30 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis final Set claimsToAdd = new HashSet<>(); for (final RepositoryRecord record : records) { if (record.getType() == RepositoryRecordType.DELETE) { - // For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable - if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) { + // For any DELETE record that we have, if claim is destructible, mark it so + if (record.getCurrentClaim() != null && isDestructable(record.getCurrentClaim())) { claimsToAdd.add(record.getCurrentClaim().getResourceClaim()); } - // If the original claim is different than the current claim and the original claim has a claimant count <= 0, mark it as destructable. - if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && getClaimantCount(record.getOriginalClaim()) <= 0) { + // If the original claim is different than the current claim and the original claim is destructible, mark it so + if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && isDestructable(record.getOriginalClaim())) { claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); } } else if (record.getType() == RepositoryRecordType.UPDATE) { - // if we have an update, and the original is no longer needed, mark original as destructable - if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && getClaimantCount(record.getOriginalClaim()) <= 0) { + // if we have an update, and the original is no longer needed, mark original as destructible + if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && isDestructable(record.getOriginalClaim())) { claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); } } + + final List transientClaims = record.getTransientClaims(); + if (transientClaims != null) { + for (final ContentClaim transientClaim : transientClaims) { + if (isDestructable(transientClaim)) { + claimsToAdd.add(transientClaim.getResourceClaim()); + } + } + } } if (!claimsToAdd.isEmpty()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java index 4b27eaefd2..25dbaee264 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java @@ -16,17 +16,17 @@ */ package org.apache.nifi.controller.repository.claim; -import java.util.concurrent.atomic.AtomicInteger; - public class StandardResourceClaim implements ResourceClaim, Comparable { + private final StandardResourceClaimManager claimManager; private final String id; private final String container; private final String section; private final boolean lossTolerant; - private final AtomicInteger claimantCount = new AtomicInteger(0); private final int hashCode; + private volatile boolean writable = true; - public StandardResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) { + public StandardResourceClaim(final StandardResourceClaimManager claimManager, final String container, final String section, final String id, final boolean lossTolerant) { + this.claimManager = claimManager; this.container = container.intern(); this.section = section.intern(); this.id = id; @@ -64,18 +64,6 @@ public class StandardResourceClaim implements ResourceClaim, Comparable 0; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java index 4826ac3454..9cb0fa1315 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java @@ -36,7 +36,7 @@ public class StandardResourceClaimManager implements ResourceClaimManager { @Override public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) { - return new StandardResourceClaim(container, section, id, lossTolerant); + return new StandardResourceClaim(this, container, section, id, lossTolerant); } private static AtomicInteger getCounter(final ResourceClaim claim) { @@ -59,8 +59,11 @@ public class StandardResourceClaimManager implements ResourceClaimManager { if (claim == null) { return 0; } - final AtomicInteger counter = claimantCounts.get(claim); - return counter == null ? 0 : counter.get(); + + synchronized (claim) { + final AtomicInteger counter = claimantCounts.get(claim); + return counter == null ? 0 : counter.get(); + } } @Override @@ -69,18 +72,30 @@ public class StandardResourceClaimManager implements ResourceClaimManager { return 0; } - final AtomicInteger counter = claimantCounts.get(claim); - if (counter == null) { - logger.debug("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim); - return -1; - } + synchronized (claim) { + final AtomicInteger counter = claimantCounts.get(claim); + if (counter == null) { + logger.warn("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim); + return -1; + } - final int newClaimantCount = counter.decrementAndGet(); - logger.debug("Decrementing claimant count for {} to {}", claim, newClaimantCount); - if (newClaimantCount == 0) { - claimantCounts.remove(claim); + final int newClaimantCount = counter.decrementAndGet(); + if (newClaimantCount < 0) { + logger.error("Decremented claimant count for {} to {}", claim, newClaimantCount); + } else { + logger.debug("Decrementing claimant count for {} to {}", claim, newClaimantCount); + } + + if (newClaimantCount == 0) { + removeClaimantCount(claim); + } + return newClaimantCount; } - return newClaimantCount; + } + + // protected so that it can be used in unit tests + protected void removeClaimantCount(final ResourceClaim claim) { + claimantCounts.remove(claim); } @Override @@ -90,15 +105,22 @@ public class StandardResourceClaimManager implements ResourceClaimManager { @Override public int incrementClaimantCount(final ResourceClaim claim, final boolean newClaim) { - final AtomicInteger counter = getCounter(claim); - - final int newClaimantCount = counter.incrementAndGet(); - logger.debug("Incrementing claimant count for {} to {}", claim, newClaimantCount); - // If the claimant count moved from 0 to 1, remove it from the queue of destructable claims. - if (!newClaim && newClaimantCount == 1) { - destructableClaims.remove(claim); + if (claim == null) { + return 0; + } + + synchronized (claim) { + final AtomicInteger counter = getCounter(claim); + + final int newClaimantCount = counter.incrementAndGet(); + logger.debug("Incrementing claimant count for {} to {}", claim, newClaimantCount); + + // If the claimant count moved from 0 to 1, remove it from the queue of destructable claims. + if (!newClaim && newClaimantCount == 1) { + destructableClaims.remove(claim); + } + return newClaimantCount; } - return newClaimantCount; } @Override @@ -107,15 +129,17 @@ public class StandardResourceClaimManager implements ResourceClaimManager { return; } - if (getClaimantCount(claim) > 0) { - return; - } - - logger.debug("Marking claim {} as destructable", claim); - try { - while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) { + synchronized (claim) { + if (getClaimantCount(claim) > 0) { + return; + } + + logger.debug("Marking claim {} as destructable", claim); + try { + while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) { + } + } catch (final InterruptedException ie) { } - } catch (final InterruptedException ie) { } } @@ -142,4 +166,16 @@ public class StandardResourceClaimManager implements ResourceClaimManager { claimantCounts.clear(); } + @Override + public void freeze(final ResourceClaim claim) { + if (claim == null) { + return; + } + + if (!(claim instanceof StandardResourceClaim)) { + throw new IllegalArgumentException("The given resource claim is not managed by this Resource Claim Manager"); + } + + ((StandardResourceClaim) claim).freeze(); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java index 4a590f210b..15a8267ec9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java @@ -156,6 +156,10 @@ public class TestFileSystemSwapManager { @Override public void purge() { } + + @Override + public void freeze(ResourceClaim claim) { + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index b36a5e67be..2a82aedaf8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -257,8 +257,8 @@ public class TestFileSystemRepository { final Path claimPath = getPath(claim); // Create the file. - try (final OutputStream out = Files.newOutputStream(claimPath, StandardOpenOption.CREATE)) { - out.write("Hello".getBytes()); + try (final OutputStream out = repository.write(claim)) { + out.write(new byte[FileSystemRepository.MAX_APPENDABLE_CLAIM_LENGTH]); } int count = repository.decrementClaimantCount(claim); @@ -379,13 +379,13 @@ public class TestFileSystemRepository { @Test(expected = ContentNotFoundException.class) public void testSizeWithNoContent() throws IOException { - final ContentClaim claim = new StandardContentClaim(new StandardResourceClaim("container1", "section 1", "1", false), 0L); + final ContentClaim claim = new StandardContentClaim(new StandardResourceClaim(claimManager, "container1", "section 1", "1", false), 0L); assertEquals(0L, repository.size(claim)); } @Test(expected = ContentNotFoundException.class) public void testReadWithNoContent() throws IOException { - final ContentClaim claim = new StandardContentClaim(new StandardResourceClaim("container1", "section 1", "1", false), 0L); + final ContentClaim claim = new StandardContentClaim(new StandardResourceClaim(claimManager, "container1", "section 1", "1", false), 0L); final InputStream in = repository.read(claim); in.close(); } @@ -427,12 +427,12 @@ public class TestFileSystemRepository { // write at least 1 MB to the output stream so that when we close the output stream // the repo won't keep the stream open. - final byte[] buff = new byte[1024 * 1024]; + final byte[] buff = new byte[FileSystemRepository.MAX_APPENDABLE_CLAIM_LENGTH]; out.write(buff); out.write(buff); - // true because claimant count is still 1. - assertTrue(repository.remove(claim)); + // false because claimant count is still 1, so the resource claim was not removed + assertFalse(repository.remove(claim)); assertEquals(0, repository.decrementClaimantCount(claim)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 2f3bff5216..658e8c0f19 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -63,7 +63,6 @@ import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardContentClaim; -import org.apache.nifi.controller.repository.claim.StandardResourceClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -99,6 +98,7 @@ public class TestStandardProcessSession { private ProvenanceEventRepository provenanceRepo; private MockFlowFileRepository flowFileRepo; private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build(); + private static StandardResourceClaimManager resourceClaimManager; @After public void cleanup() { @@ -136,6 +136,8 @@ public class TestStandardProcessSession { @Before @SuppressWarnings("unchecked") public void setup() throws IOException { + resourceClaimManager = new StandardResourceClaimManager(); + System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties"); final FlowFileEventRepository flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class); final CounterRepository counterRepo = Mockito.mock(CounterRepository.class); @@ -815,7 +817,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) .size(1L) .build(); flowFileQueue.put(flowFileRecord); @@ -963,7 +965,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) .size(1L) .build(); flowFileQueue.put(flowFileRecord); @@ -987,7 +989,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) .build(); flowFileQueue.put(flowFileRecord); @@ -1003,7 +1005,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) .contentClaimOffset(1000L) .size(1000L) .build(); @@ -1028,7 +1030,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) .build(); flowFileQueue.put(flowFileRecord); @@ -1045,7 +1047,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) .contentClaimOffset(1000L).size(1L).build(); flowFileQueue.put(flowFileRecord2); @@ -1114,7 +1116,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) .contentClaimOffset(0L).size(0L).build(); flowFileQueue.put(flowFileRecord); @@ -1152,7 +1154,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) .contentClaimOffset(0L).size(0L).build(); flowFileQueue.put(flowFileRecord); @@ -1479,7 +1481,7 @@ public class TestStandardProcessSession { final Set claims = new HashSet<>(); for (long i = 0; i < idGenerator.get(); i++) { - final ResourceClaim resourceClaim = new StandardResourceClaim("container", "section", String.valueOf(i), false); + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim("container", "section", String.valueOf(i), false); final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L); if (getClaimantCount(contentClaim) > 0) { claims.add(contentClaim); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index 55b7426016..51b654f7b0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -44,7 +44,6 @@ import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardContentClaim; -import org.apache.nifi.controller.repository.claim.StandardResourceClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.swap.StandardSwapContents; import org.apache.nifi.controller.swap.StandardSwapSummary; @@ -87,10 +86,10 @@ public class TestWriteAheadFlowFileRepository { when(connection.getFlowFileQueue()).thenReturn(queue); queueProvider.addConnection(connection); - final ResourceClaim resourceClaim1 = new StandardResourceClaim("container", "section", "1", false); + final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false); final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L); - final ResourceClaim resourceClaim2 = new StandardResourceClaim("container", "section", "2", false); + final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false); final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L); // Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java new file mode 100644 index 0000000000..d29105a300 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.claim; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +public class TestStandardResourceClaimManager { + + @Test + @Ignore("Unit test was created to repeat a concurrency bug in StandardResourceClaimManager. " + + "However, now that the concurrency bug has been fixed, the test will deadlock. Leaving here for now in case it's valuable before the commit is pushed") + public void testIncrementAndDecrementThreadSafety() throws InterruptedException { + final AtomicBoolean waitToRemove = new AtomicBoolean(true); + final CountDownLatch decrementComplete = new CountDownLatch(1); + + final StandardResourceClaimManager manager = new StandardResourceClaimManager() { + @Override + protected void removeClaimantCount(final ResourceClaim claim) { + decrementComplete.countDown(); + + while (waitToRemove.get()) { + try { + Thread.sleep(10L); + } catch (final InterruptedException ie) { + Assert.fail("Interrupted while waiting to remove claimant count"); + } + } + + super.removeClaimantCount(claim); + } + }; + + final ResourceClaim resourceClaim = manager.newResourceClaim("container", "section", "id", false); + assertEquals(1, manager.incrementClaimantCount(resourceClaim)); // increment claimant count to 1. + + assertEquals(1, manager.getClaimantCount(resourceClaim)); + + // Decrement the claimant count. This should decrement the count to 0. However, we have 'waitToRemove' set to true, + // so the manager will not actually remove the claimant count (or return from this method) until we set 'waitToRemove' + // to false. We do this so that we can increment the claimant count in a separate thread. Because we will be incrementing + // the count in 1 thread and decrementing it in another thread, the end result should be that the claimant count is still + // at 1. + final Runnable decrementCountRunnable = new Runnable() { + @Override + public void run() { + manager.decrementClaimantCount(resourceClaim); + } + }; + + final Runnable incrementCountRunnable = new Runnable() { + @Override + public void run() { + // Wait until the count has been decremented + try { + decrementComplete.await(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + + // Increment the claimant count + manager.incrementClaimantCount(resourceClaim); + + // allow the 'decrement Thread' to complete + waitToRemove.set(false); + } + }; + + // Start the threads so that the claim count is incremented and decremented at the same time + final Thread decrementThread = new Thread(decrementCountRunnable); + final Thread incrementThread = new Thread(incrementCountRunnable); + + decrementThread.start(); + incrementThread.start(); + + // Wait for both threads to complete + incrementThread.join(); + decrementThread.join(); + + // claimant count should still be 1, since 1 thread incremented it and 1 thread decremented it! + assertEquals(1, manager.getClaimantCount(resourceClaim)); + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml index 6bbe80025e..09cc037d8f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml @@ -33,4 +33,6 @@ + + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java index 1374c10397..cd5ce54f61 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java @@ -16,8 +16,22 @@ */ package org.apache.nifi.processors.standard; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.http.annotation.ThreadSafe; import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -25,24 +39,15 @@ import org.apache.nifi.components.Validator; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; - @ThreadSafe() @EventDriven() @Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"}) @@ -167,6 +172,20 @@ public class DebugFlow extends AbstractProcessor { } }) .build(); + static final PropertyDescriptor WRITE_ITERATIONS = new PropertyDescriptor.Builder() + .name("Write Iterations") + .description("Number of times to write to the FlowFile") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .required(true) + .defaultValue("0") + .build(); + static final PropertyDescriptor CONTENT_SIZE = new PropertyDescriptor.Builder() + .name("Content Size") + .description("The number of bytes to write each time that the FlowFile is written to") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .required(true) + .defaultValue("1 KB") + .build(); private volatile Integer flowFileMaxSuccess = 0; private volatile Integer flowFileMaxFailure = 0; @@ -225,6 +244,8 @@ public class DebugFlow extends AbstractProcessor { propList.add(NO_FF_EXCEPTION_ITERATIONS); propList.add(NO_FF_YIELD_ITERATIONS); propList.add(NO_FF_EXCEPTION_CLASS); + propList.add(WRITE_ITERATIONS); + propList.add(CONTENT_SIZE); propertyDescriptors.compareAndSet(null, Collections.unmodifiableList(propList)); } return propertyDescriptors.get(); @@ -304,6 +325,23 @@ public class DebugFlow extends AbstractProcessor { } return; } else { + final int writeIterations = context.getProperty(WRITE_ITERATIONS).asInteger(); + if (writeIterations > 0 && pass == 1) { + final Random random = new Random(); + + for (int i = 0; i < writeIterations; i++) { + final byte[] data = new byte[context.getProperty(CONTENT_SIZE).asDataSize(DataUnit.B).intValue()]; + random.nextBytes(data); + + ff = session.write(ff, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(data); + } + }); + } + } + if (curr_ff_resp.state() == FlowFileResponseState.FF_SUCCESS_RESPONSE) { if (flowFileCurrSuccess < flowFileMaxSuccess) { flowFileCurrSuccess += 1; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index d23d88dd19..c1f9a7796e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -88,4 +88,4 @@ org.apache.nifi.processors.standard.TransformXml org.apache.nifi.processors.standard.UnpackContent org.apache.nifi.processors.standard.ValidateXml org.apache.nifi.processors.standard.ExecuteSQL -org.apache.nifi.processors.standard.FetchDistributedMapCache +org.apache.nifi.processors.standard.FetchDistributedMapCache \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java index 8eb53aa406..5aa2e1ebf4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java @@ -76,16 +76,6 @@ public class TestDebugFlow { runner.setProperty(DebugFlow.NO_FF_YIELD_ITERATIONS, "0"); } - @Test - public void testGetSupportedPropertyDescriptors() throws Exception { - assertEquals(11, debugFlow.getPropertyDescriptors().size()); - } - - @Test - public void testGetRelationships() throws Exception { - assertEquals(2, debugFlow.getRelationships().size()); - } - private boolean isInContents(byte[] content) { for (Map.Entry entry : contents.entrySet()) { if (((String)entry.getValue()).compareTo(new String(content)) == 0) {