NIFI-2551: Addressed a threading issue in the StandardResourceClaimManager and performed some refactoring so that we can ensure thread-safety across different components of the application, such as ProcessSession and WriteAheadFlowFile Repository when interacting with the StandardResourceClaimManager. Update DebugFlow to allow it to write to a FlowFile multiple times, which exposes the concurrency bug. Also avoided calling ContentRepository.remove() from ProcessSession whenever the content is no longer needed, as that can cause problems now that the Resource Claim is backing the content claim.

This commit is contained in:
Mark Payne 2016-08-12 13:56:03 -04:00
parent ed42f2e3e3
commit 6af768d0a1
20 changed files with 424 additions and 200 deletions

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.nifi.controller.repository; package org.apache.nifi.controller.repository;
import java.util.List;
import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaim;
@ -79,4 +81,10 @@ public interface RepositoryRecord {
* swapped out * swapped out
*/ */
String getSwapLocation(); String getSwapLocation();
/**
* @return a List of Content Claims that are "transient," meaning that they existed only for the
* life of the Process Session in which they were created and should not be persisted.
*/
List<ContentClaim> getTransientClaims();
} }

View File

@ -51,4 +51,17 @@ public interface ResourceClaim extends Comparable<ResourceClaim> {
*/ */
boolean isLossTolerant(); boolean isLossTolerant();
/**
* @return <code>true</code> if the Resource Claim may still be written to, <code>false</code> if the Resource Claim
* will no longer be written to
*/
boolean isWritable();
/**
* Indicates whether or not the Resource Claim is in use. A Resource Claim is said to be in use if either it is
* writable or at least one Content Claim still refers to the it
*
* @return <code>true</code> if the Resource Claim is in use, <code>false</code> otherwise
*/
boolean isInUse();
} }

View File

@ -132,4 +132,11 @@ public interface ResourceClaimManager {
* about * about
*/ */
void purge(); void purge();
/**
* Freezes the Resource Claim so that it can no longer be written to
*
* @param claim the resource claim to freeze
*/
void freeze(ResourceClaim claim);
} }

View File

@ -125,7 +125,6 @@ import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.io.LimitedInputStream; import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent; import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
@ -3532,7 +3531,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return null; return null;
} }
final StandardResourceClaim resourceClaim = new StandardResourceClaim(container, section, identifier, false); final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, false);
return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset.longValue()); return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset.longValue());
} }

View File

@ -1300,6 +1300,11 @@ public class StandardFlowFileQueue implements FlowFileQueue {
public String getSwapLocation() { public String getSwapLocation() {
return null; return null;
} }
@Override
public List<ContentClaim> getTransientClaims() {
return Collections.emptyList();
}
}; };
} }

View File

@ -102,7 +102,7 @@ public class FileSystemRepository implements ContentRepository {
// with creating and deleting too many files, as we had to delete 100's of thousands of files every 2 minutes // with creating and deleting too many files, as we had to delete 100's of thousands of files every 2 minutes
// in order to avoid backpressure on session commits. With 1 MB as the target file size, 100's of thousands of // in order to avoid backpressure on session commits. With 1 MB as the target file size, 100's of thousands of
// files would mean that we are writing gigabytes per second - quite a bit faster than any disks can handle now. // files would mean that we are writing gigabytes per second - quite a bit faster than any disks can handle now.
private final long maxAppendClaimLength = 1024L * 1024L; static final int MAX_APPENDABLE_CLAIM_LENGTH = 1024 * 1024;
// Queue for claims that are kept open for writing. Size of 100 is pretty arbitrary. Ideally, this will be at // Queue for claims that are kept open for writing. Size of 100 is pretty arbitrary. Ideally, this will be at
// least as large as the number of threads that will be updating the repository simultaneously but we don't want // least as large as the number of threads that will be updating the repository simultaneously but we don't want
@ -111,7 +111,6 @@ public class FileSystemRepository implements ContentRepository {
// the OutputStream that we can use for writing to the claim. // the OutputStream that we can use for writing to the claim.
private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100); private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100);
private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100); private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100);
private final Set<ResourceClaim> activeResourceClaims = Collections.synchronizedSet(new HashSet<ResourceClaim>());
private final boolean archiveData; private final boolean archiveData;
private final long maxArchiveMillis; private final long maxArchiveMillis;
@ -497,19 +496,7 @@ public class FileSystemRepository implements ContentRepository {
public ContentClaim create(final boolean lossTolerant) throws IOException { public ContentClaim create(final boolean lossTolerant) throws IOException {
ResourceClaim resourceClaim; ResourceClaim resourceClaim;
// We need to synchronize on this queue because the act of pulling something off
// the queue and incrementing the associated claimant count MUST be done atomically.
// This way, if the claimant count is decremented to 0, we can ensure that the
// claim is not then pulled from the queue and used as another thread is destroying/archiving
// the claim. The logic in the remove() method dictates that the underlying file can be
// deleted (or archived) only if the claimant count becomes <= 0 AND there is no other claim on
// the queue that references that file. As a result, we need to ensure that those two conditions
// can be evaluated atomically. In order for that to be the case, we need to also treat the
// removal of a claim from the queue and the incrementing of its claimant count as an atomic
// action to ensure that the comparison of those two conditions is atomic also. As a result,
// we will synchronize on the queue while performing those actions.
final long resourceOffset; final long resourceOffset;
synchronized (writableClaimQueue) {
final ClaimLengthPair pair = writableClaimQueue.poll(); final ClaimLengthPair pair = writableClaimQueue.poll();
if (pair == null) { if (pair == null) {
final long currentIndex = index.incrementAndGet(); final long currentIndex = index.incrementAndGet();
@ -548,13 +535,14 @@ public class FileSystemRepository implements ContentRepository {
final File file = getPath(resourceClaim).toFile(); final File file = getPath(resourceClaim).toFile();
ByteCountingOutputStream claimStream = new SynchronizedByteCountingOutputStream(new FileOutputStream(file, true), file.length()); ByteCountingOutputStream claimStream = new SynchronizedByteCountingOutputStream(new FileOutputStream(file, true), file.length());
writableClaimStreams.put(resourceClaim, claimStream); writableClaimStreams.put(resourceClaim, claimStream);
incrementClaimantCount(resourceClaim, true);
} else { } else {
resourceClaim = pair.getClaim(); resourceClaim = pair.getClaim();
resourceOffset = pair.getLength(); resourceOffset = pair.getLength();
LOG.debug("Reusing Resource Claim {}", resourceClaim); LOG.debug("Reusing Resource Claim {}", resourceClaim);
}
resourceClaimManager.incrementClaimantCount(resourceClaim, true); incrementClaimantCount(resourceClaim, false);
} }
final StandardContentClaim scc = new StandardContentClaim(resourceClaim, resourceOffset); final StandardContentClaim scc = new StandardContentClaim(resourceClaim, resourceOffset);
@ -563,18 +551,24 @@ public class FileSystemRepository implements ContentRepository {
@Override @Override
public int incrementClaimaintCount(final ContentClaim claim) { public int incrementClaimaintCount(final ContentClaim claim) {
if (claim == null) { return incrementClaimantCount(claim == null ? null : claim.getResourceClaim(), false);
}
protected int incrementClaimantCount(final ResourceClaim resourceClaim, final boolean newClaim) {
if (resourceClaim == null) {
return 0; return 0;
} }
return resourceClaimManager.incrementClaimantCount(claim.getResourceClaim()); return resourceClaimManager.incrementClaimantCount(resourceClaim, newClaim);
} }
@Override @Override
public int getClaimantCount(final ContentClaim claim) { public int getClaimantCount(final ContentClaim claim) {
if (claim == null) { if (claim == null) {
return 0; return 0;
} }
return resourceClaimManager.getClaimantCount(claim.getResourceClaim()); return resourceClaimManager.getClaimantCount(claim.getResourceClaim());
} }
@ -584,8 +578,7 @@ public class FileSystemRepository implements ContentRepository {
return 0; return 0;
} }
final int claimantCount = resourceClaimManager.decrementClaimantCount(claim.getResourceClaim()); return resourceClaimManager.decrementClaimantCount(claim.getResourceClaim());
return claimantCount;
} }
@Override @Override
@ -602,24 +595,10 @@ public class FileSystemRepository implements ContentRepository {
return false; return false;
} }
// we synchronize on the queue here because if the claimant count is 0, // If the claim is still in use, we won't remove it.
// we need to be able to remove any instance of that resource claim from the if (claim.isInUse()) {
// queue atomically (i.e., the checking of the claimant count plus removal from the queue
// must be atomic). The create() method also synchronizes on the queue whenever it
// polls from the queue and increments a claimant count in order to ensure that these
// two conditions can be checked atomically.
synchronized (writableClaimQueue) {
final int claimantCount = resourceClaimManager.getClaimantCount(claim);
if (claimantCount > 0) {
// if other content claims are claiming the same resource, we have nothing to destroy,
// so just consider the destruction successful.
return true;
}
if (activeResourceClaims.contains(claim) || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
// If we have an open OutputStream for the claim, we will not destroy the claim.
return false; return false;
} }
}
Path path = null; Path path = null;
try { try {
@ -629,6 +608,7 @@ public class FileSystemRepository implements ContentRepository {
// Ensure that we have no writable claim streams for this resource claim // Ensure that we have no writable claim streams for this resource claim
final ByteCountingOutputStream bcos = writableClaimStreams.remove(claim); final ByteCountingOutputStream bcos = writableClaimStreams.remove(claim);
if (bcos != null) { if (bcos != null) {
try { try {
bcos.close(); bcos.close();
@ -841,6 +821,7 @@ public class FileSystemRepository implements ContentRepository {
return write(claim, false); return write(claim, false);
} }
private OutputStream write(final ContentClaim claim, final boolean append) throws IOException { private OutputStream write(final ContentClaim claim, final boolean append) throws IOException {
if (claim == null) { if (claim == null) {
throw new NullPointerException("ContentClaim cannot be null"); throw new NullPointerException("ContentClaim cannot be null");
@ -857,12 +838,9 @@ public class FileSystemRepository implements ContentRepository {
throw new IllegalArgumentException("Cannot write to " + claim + " because it has already been written to."); throw new IllegalArgumentException("Cannot write to " + claim + " because it has already been written to.");
} }
final ResourceClaim resourceClaim = claim.getResourceClaim();
ByteCountingOutputStream claimStream = writableClaimStreams.get(scc.getResourceClaim()); ByteCountingOutputStream claimStream = writableClaimStreams.get(scc.getResourceClaim());
final int initialLength = append ? (int) Math.max(0, scc.getLength()) : 0; final int initialLength = append ? (int) Math.max(0, scc.getLength()) : 0;
activeResourceClaims.add(resourceClaim);
final ByteCountingOutputStream bcos = claimStream; final ByteCountingOutputStream bcos = claimStream;
final OutputStream out = new OutputStream() { final OutputStream out = new OutputStream() {
private long bytesWritten = 0L; private long bytesWritten = 0L;
@ -937,7 +915,6 @@ public class FileSystemRepository implements ContentRepository {
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
closed = true; closed = true;
activeResourceClaims.remove(resourceClaim);
if (alwaysSync) { if (alwaysSync) {
((FileOutputStream) bcos.getWrappedStream()).getFD().sync(); ((FileOutputStream) bcos.getWrappedStream()).getFD().sync();
@ -953,20 +930,15 @@ public class FileSystemRepository implements ContentRepository {
// is called. In this case, we don't have to actually close the file stream. Instead, we // is called. In this case, we don't have to actually close the file stream. Instead, we
// can just add it onto the queue and continue to use it for the next content claim. // can just add it onto the queue and continue to use it for the next content claim.
final long resourceClaimLength = scc.getOffset() + scc.getLength(); final long resourceClaimLength = scc.getOffset() + scc.getLength();
if (recycle && resourceClaimLength < maxAppendClaimLength) { if (recycle && resourceClaimLength < MAX_APPENDABLE_CLAIM_LENGTH) {
// we do not have to synchronize on the writable claim queue here because we
// are only adding something to the queue. We must synchronize if we are
// using a ResourceClaim from the queue and incrementing the claimant count on that resource
// because those need to be done atomically, or if we are destroying a claim that is on
// the queue because we need to ensure that the latter operation does not cause problems
// with the former.
final ClaimLengthPair pair = new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength); final ClaimLengthPair pair = new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength);
final boolean enqueued = writableClaimQueue.offer(pair); final boolean enqueued = writableClaimQueue.offer(pair);
if (enqueued) { if (enqueued) {
LOG.debug("Claim length less than max; Leaving {} in writableClaimStreams map", this); LOG.debug("Claim length less than max; Adding {} back to Writable Claim Queue", this);
} else { } else {
writableClaimStreams.remove(scc.getResourceClaim()); writableClaimStreams.remove(scc.getResourceClaim());
bcos.close(); bcos.close();
LOG.debug("Claim length less than max; Closing {} because could not add back to queue", this); LOG.debug("Claim length less than max; Closing {} because could not add back to queue", this);
@ -978,8 +950,12 @@ public class FileSystemRepository implements ContentRepository {
// we've reached the limit for this claim. Don't add it back to our queue. // we've reached the limit for this claim. Don't add it back to our queue.
// Instead, just remove it and move on. // Instead, just remove it and move on.
// Mark the claim as no longer being able to be written to
resourceClaimManager.freeze(scc.getResourceClaim());
// ensure that the claim is no longer on the queue // ensure that the claim is no longer on the queue
writableClaimQueue.remove(new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength)); writableClaimQueue.remove(new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength));
bcos.close(); bcos.close();
LOG.debug("Claim lenth >= max; Closing {}", this); LOG.debug("Claim lenth >= max; Closing {}", this);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
@ -1109,18 +1085,16 @@ public class FileSystemRepository implements ContentRepository {
return false; return false;
} }
synchronized (writableClaimQueue) { if (claim.isInUse()) {
final int claimantCount = claim == null ? 0 : resourceClaimManager.getClaimantCount(claim);
if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
return false; return false;
} }
}
// If the claim count is decremented to 0 (<= 0 as a 'defensive programming' strategy), ensure that // If the claim count is decremented to 0 (<= 0 as a 'defensive programming' strategy), ensure that
// we close the stream if there is one. There may be a stream open if create() is called and then // we close the stream if there is one. There may be a stream open if create() is called and then
// claimant count is removed without writing to the claim (or more specifically, without closing the // claimant count is removed without writing to the claim (or more specifically, without closing the
// OutputStream that is returned when calling write() ). // OutputStream that is returned when calling write() ).
final OutputStream out = writableClaimStreams.remove(claim); final OutputStream out = writableClaimStreams.remove(claim);
if (out != null) { if (out != null) {
try { try {
out.close(); out.close();

View File

@ -2278,24 +2278,26 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final boolean contentModified = record.getWorkingClaim() != null && record.getWorkingClaim() != record.getOriginalClaim(); final boolean contentModified = record.getWorkingClaim() != null && record.getWorkingClaim() != record.getOriginalClaim();
// If the working claim is not the same as the original claim, we have modified the content of // If the working claim is not the same as the original claim, we have modified the content of
// the FlowFile, and we need to remove the newly created file (the working claim). However, if // the FlowFile, and we need to remove the newly created content (the working claim). However, if
// they are the same, we cannot just remove the claim because record.getWorkingClaim() will return // they are the same, we cannot just remove the claim because record.getWorkingClaim() will return
// the original claim if the record is "working" but the content has not been modified // the original claim if the record is "working" but the content has not been modified
// (e.g., in the case of attributes only were updated) // (e.g., in the case of attributes only were updated)
//
// In other words: // In other words:
// If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will // If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will
// return the same claim as record.getOriginalClaim(). So we cannot just remove the working claim because // return the same claim as record.getOriginalClaim(). So we cannot just remove the working claim because
// that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do // that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do
// because we will do that later, in the session.commit() and that would result in removing the original claim twice. // because we will do that later, in the session.commit() and that would result in decrementing the count for
// the original claim twice.
if (contentModified) { if (contentModified) {
// In this case, it's ok to go ahead and destroy the content because we know that the working claim is going to be // In this case, it's ok to decrement the claimant count for the content because we know that the working claim is going to be
// updated and the given working claim is referenced only by FlowFiles in this session (because it's the Working Claim). // updated and the given working claim is referenced only by FlowFiles in this session (because it's the Working Claim).
// Therefore, if this is the only record that refers to that Content Claim, we can destroy the claim. This happens, // Therefore, we need to decrement the claimant count, and since the Working Claim is being changed, that means that
// for instance, if a Processor modifies the content of a FlowFile more than once before committing the session. // the Working Claim is a transient claim (the content need not be persisted because no FlowFile refers to it). We cannot simply
final int claimantCount = context.getContentRepository().decrementClaimantCount(record.getWorkingClaim()); // remove the content because there may be other FlowFiles that reference the same Resource Claim. Marking the Content Claim as
if (claimantCount == 0) { // transient, though, will result in the FlowFile Repository cleaning up as appropriate.
context.getContentRepository().remove(record.getWorkingClaim()); context.getContentRepository().decrementClaimantCount(record.getWorkingClaim());
} record.addTransientClaim(record.getWorkingClaim());
} }
} }

View File

@ -16,8 +16,10 @@
*/ */
package org.apache.nifi.controller.repository; package org.apache.nifi.controller.repository;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue;
@ -35,6 +37,7 @@ public class StandardRepositoryRecord implements RepositoryRecord {
private String swapLocation; private String swapLocation;
private final Map<String, String> updatedAttributes = new HashMap<>(); private final Map<String, String> updatedAttributes = new HashMap<>();
private final Map<String, String> originalAttributes; private final Map<String, String> originalAttributes;
private List<ContentClaim> transientClaims;
/** /**
* Creates a new record which has no original claim or flow file - it is entirely new * Creates a new record which has no original claim or flow file - it is entirely new
@ -199,4 +202,20 @@ public class StandardRepositoryRecord implements RepositoryRecord {
public String toString() { public String toString() {
return "StandardRepositoryRecord[UpdateType=" + getType() + ",Record=" + getCurrent() + "]"; return "StandardRepositoryRecord[UpdateType=" + getType() + ",Record=" + getCurrent() + "]";
} }
@Override
public List<ContentClaim> getTransientClaims() {
return transientClaims == null ? Collections.<ContentClaim> emptyList() : Collections.unmodifiableList(transientClaims);
}
void addTransientClaim(final ContentClaim claim) {
if (claim == null) {
return;
}
if (transientClaims == null) {
transientClaims = new ArrayList<>();
}
transientClaims.add(claim);
}
} }

View File

@ -177,17 +177,17 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
claimManager.markDestructable(resourceClaim); claimManager.markDestructable(resourceClaim);
} }
private int getClaimantCount(final ContentClaim claim) { private boolean isDestructable(final ContentClaim claim) {
if (claim == null) { if (claim == null) {
return 0; return false;
} }
final ResourceClaim resourceClaim = claim.getResourceClaim(); final ResourceClaim resourceClaim = claim.getResourceClaim();
if (resourceClaim == null) { if (resourceClaim == null) {
return 0; return false;
} }
return claimManager.getClaimantCount(resourceClaim); return !resourceClaim.isInUse();
} }
private void updateRepository(final Collection<RepositoryRecord> records, final boolean sync) throws IOException { private void updateRepository(final Collection<RepositoryRecord> records, final boolean sync) throws IOException {
@ -211,21 +211,30 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
final Set<ResourceClaim> claimsToAdd = new HashSet<>(); final Set<ResourceClaim> claimsToAdd = new HashSet<>();
for (final RepositoryRecord record : records) { for (final RepositoryRecord record : records) {
if (record.getType() == RepositoryRecordType.DELETE) { if (record.getType() == RepositoryRecordType.DELETE) {
// For any DELETE record that we have, if current claim's claimant count <= 0, mark it as destructable // For any DELETE record that we have, if claim is destructible, mark it so
if (record.getCurrentClaim() != null && getClaimantCount(record.getCurrentClaim()) <= 0) { if (record.getCurrentClaim() != null && isDestructable(record.getCurrentClaim())) {
claimsToAdd.add(record.getCurrentClaim().getResourceClaim()); claimsToAdd.add(record.getCurrentClaim().getResourceClaim());
} }
// If the original claim is different than the current claim and the original claim has a claimant count <= 0, mark it as destructable. // If the original claim is different than the current claim and the original claim is destructible, mark it so
if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && getClaimantCount(record.getOriginalClaim()) <= 0) { if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && isDestructable(record.getOriginalClaim())) {
claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
} }
} else if (record.getType() == RepositoryRecordType.UPDATE) { } else if (record.getType() == RepositoryRecordType.UPDATE) {
// if we have an update, and the original is no longer needed, mark original as destructable // if we have an update, and the original is no longer needed, mark original as destructible
if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && getClaimantCount(record.getOriginalClaim()) <= 0) { if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && isDestructable(record.getOriginalClaim())) {
claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
} }
} }
final List<ContentClaim> transientClaims = record.getTransientClaims();
if (transientClaims != null) {
for (final ContentClaim transientClaim : transientClaims) {
if (isDestructable(transientClaim)) {
claimsToAdd.add(transientClaim.getResourceClaim());
}
}
}
} }
if (!claimsToAdd.isEmpty()) { if (!claimsToAdd.isEmpty()) {

View File

@ -16,17 +16,17 @@
*/ */
package org.apache.nifi.controller.repository.claim; package org.apache.nifi.controller.repository.claim;
import java.util.concurrent.atomic.AtomicInteger;
public class StandardResourceClaim implements ResourceClaim, Comparable<ResourceClaim> { public class StandardResourceClaim implements ResourceClaim, Comparable<ResourceClaim> {
private final StandardResourceClaimManager claimManager;
private final String id; private final String id;
private final String container; private final String container;
private final String section; private final String section;
private final boolean lossTolerant; private final boolean lossTolerant;
private final AtomicInteger claimantCount = new AtomicInteger(0);
private final int hashCode; private final int hashCode;
private volatile boolean writable = true;
public StandardResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) { public StandardResourceClaim(final StandardResourceClaimManager claimManager, final String container, final String section, final String id, final boolean lossTolerant) {
this.claimManager = claimManager;
this.container = container.intern(); this.container = container.intern();
this.section = section.intern(); this.section = section.intern();
this.id = id; this.id = id;
@ -64,18 +64,6 @@ public class StandardResourceClaim implements ResourceClaim, Comparable<Resource
return section; return section;
} }
int getClaimantCount() {
return claimantCount.get();
}
int decrementClaimantCount() {
return claimantCount.decrementAndGet();
}
int incrementClaimantCount() {
return claimantCount.incrementAndGet();
}
/** /**
* Provides the natural ordering for ResourceClaim objects. By default they are sorted by their id, then container, then section * Provides the natural ordering for ResourceClaim objects. By default they are sorted by their id, then container, then section
* *
@ -131,4 +119,27 @@ public class StandardResourceClaim implements ResourceClaim, Comparable<Resource
return "StandardResourceClaim[id=" + id + ", container=" + container + ", section=" + section + "]"; return "StandardResourceClaim[id=" + id + ", container=" + container + ", section=" + section + "]";
} }
@Override
public boolean isWritable() {
return writable;
}
/**
* Freeze the Resource Claim so that it can now longer be written to
*/
void freeze() {
this.writable = false;
}
@Override
public boolean isInUse() {
// Note that it is critical here that we always check isWritable() BEFORE checking
// the claimant count. This is due to the fact that if the claim is in fact writable, the claimant count
// could increase. So if we first check claimant count and that is 0, and then we check isWritable, it may be
// that the claimant count has changed to 1 before checking isWritable.
// However, if isWritable() is false, then the only way that the claimant count can increase is if a FlowFile referencing
// the Resource Claim is cloned. In this case, though, the claimant count has not become 0.
// Said another way, if isWritable() == false, then the claimant count can never increase from 0.
return isWritable() || claimManager.getClaimantCount(this) > 0;
}
} }

View File

@ -36,7 +36,7 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
@Override @Override
public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) { public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) {
return new StandardResourceClaim(container, section, id, lossTolerant); return new StandardResourceClaim(this, container, section, id, lossTolerant);
} }
private static AtomicInteger getCounter(final ResourceClaim claim) { private static AtomicInteger getCounter(final ResourceClaim claim) {
@ -59,9 +59,12 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
if (claim == null) { if (claim == null) {
return 0; return 0;
} }
synchronized (claim) {
final AtomicInteger counter = claimantCounts.get(claim); final AtomicInteger counter = claimantCounts.get(claim);
return counter == null ? 0 : counter.get(); return counter == null ? 0 : counter.get();
} }
}
@Override @Override
public int decrementClaimantCount(final ResourceClaim claim) { public int decrementClaimantCount(final ResourceClaim claim) {
@ -69,19 +72,31 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
return 0; return 0;
} }
synchronized (claim) {
final AtomicInteger counter = claimantCounts.get(claim); final AtomicInteger counter = claimantCounts.get(claim);
if (counter == null) { if (counter == null) {
logger.debug("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim); logger.warn("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim);
return -1; return -1;
} }
final int newClaimantCount = counter.decrementAndGet(); final int newClaimantCount = counter.decrementAndGet();
if (newClaimantCount < 0) {
logger.error("Decremented claimant count for {} to {}", claim, newClaimantCount);
} else {
logger.debug("Decrementing claimant count for {} to {}", claim, newClaimantCount); logger.debug("Decrementing claimant count for {} to {}", claim, newClaimantCount);
}
if (newClaimantCount == 0) { if (newClaimantCount == 0) {
claimantCounts.remove(claim); removeClaimantCount(claim);
} }
return newClaimantCount; return newClaimantCount;
} }
}
// protected so that it can be used in unit tests
protected void removeClaimantCount(final ResourceClaim claim) {
claimantCounts.remove(claim);
}
@Override @Override
public int incrementClaimantCount(final ResourceClaim claim) { public int incrementClaimantCount(final ResourceClaim claim) {
@ -90,16 +105,23 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
@Override @Override
public int incrementClaimantCount(final ResourceClaim claim, final boolean newClaim) { public int incrementClaimantCount(final ResourceClaim claim, final boolean newClaim) {
if (claim == null) {
return 0;
}
synchronized (claim) {
final AtomicInteger counter = getCounter(claim); final AtomicInteger counter = getCounter(claim);
final int newClaimantCount = counter.incrementAndGet(); final int newClaimantCount = counter.incrementAndGet();
logger.debug("Incrementing claimant count for {} to {}", claim, newClaimantCount); logger.debug("Incrementing claimant count for {} to {}", claim, newClaimantCount);
// If the claimant count moved from 0 to 1, remove it from the queue of destructable claims. // If the claimant count moved from 0 to 1, remove it from the queue of destructable claims.
if (!newClaim && newClaimantCount == 1) { if (!newClaim && newClaimantCount == 1) {
destructableClaims.remove(claim); destructableClaims.remove(claim);
} }
return newClaimantCount; return newClaimantCount;
} }
}
@Override @Override
public void markDestructable(final ResourceClaim claim) { public void markDestructable(final ResourceClaim claim) {
@ -107,6 +129,7 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
return; return;
} }
synchronized (claim) {
if (getClaimantCount(claim) > 0) { if (getClaimantCount(claim) > 0) {
return; return;
} }
@ -118,6 +141,7 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
} catch (final InterruptedException ie) { } catch (final InterruptedException ie) {
} }
} }
}
@Override @Override
public void drainDestructableClaims(final Collection<ResourceClaim> destination, final int maxElements) { public void drainDestructableClaims(final Collection<ResourceClaim> destination, final int maxElements) {
@ -142,4 +166,16 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
claimantCounts.clear(); claimantCounts.clear();
} }
@Override
public void freeze(final ResourceClaim claim) {
if (claim == null) {
return;
}
if (!(claim instanceof StandardResourceClaim)) {
throw new IllegalArgumentException("The given resource claim is not managed by this Resource Claim Manager");
}
((StandardResourceClaim) claim).freeze();
}
} }

View File

@ -156,6 +156,10 @@ public class TestFileSystemSwapManager {
@Override @Override
public void purge() { public void purge() {
} }
@Override
public void freeze(ResourceClaim claim) {
}
} }

View File

@ -257,8 +257,8 @@ public class TestFileSystemRepository {
final Path claimPath = getPath(claim); final Path claimPath = getPath(claim);
// Create the file. // Create the file.
try (final OutputStream out = Files.newOutputStream(claimPath, StandardOpenOption.CREATE)) { try (final OutputStream out = repository.write(claim)) {
out.write("Hello".getBytes()); out.write(new byte[FileSystemRepository.MAX_APPENDABLE_CLAIM_LENGTH]);
} }
int count = repository.decrementClaimantCount(claim); int count = repository.decrementClaimantCount(claim);
@ -379,13 +379,13 @@ public class TestFileSystemRepository {
@Test(expected = ContentNotFoundException.class) @Test(expected = ContentNotFoundException.class)
public void testSizeWithNoContent() throws IOException { public void testSizeWithNoContent() throws IOException {
final ContentClaim claim = new StandardContentClaim(new StandardResourceClaim("container1", "section 1", "1", false), 0L); final ContentClaim claim = new StandardContentClaim(new StandardResourceClaim(claimManager, "container1", "section 1", "1", false), 0L);
assertEquals(0L, repository.size(claim)); assertEquals(0L, repository.size(claim));
} }
@Test(expected = ContentNotFoundException.class) @Test(expected = ContentNotFoundException.class)
public void testReadWithNoContent() throws IOException { public void testReadWithNoContent() throws IOException {
final ContentClaim claim = new StandardContentClaim(new StandardResourceClaim("container1", "section 1", "1", false), 0L); final ContentClaim claim = new StandardContentClaim(new StandardResourceClaim(claimManager, "container1", "section 1", "1", false), 0L);
final InputStream in = repository.read(claim); final InputStream in = repository.read(claim);
in.close(); in.close();
} }
@ -427,12 +427,12 @@ public class TestFileSystemRepository {
// write at least 1 MB to the output stream so that when we close the output stream // write at least 1 MB to the output stream so that when we close the output stream
// the repo won't keep the stream open. // the repo won't keep the stream open.
final byte[] buff = new byte[1024 * 1024]; final byte[] buff = new byte[FileSystemRepository.MAX_APPENDABLE_CLAIM_LENGTH];
out.write(buff); out.write(buff);
out.write(buff); out.write(buff);
// true because claimant count is still 1. // false because claimant count is still 1, so the resource claim was not removed
assertTrue(repository.remove(claim)); assertFalse(repository.remove(claim));
assertEquals(0, repository.decrementClaimantCount(claim)); assertEquals(0, repository.decrementClaimantCount(claim));

View File

@ -63,7 +63,6 @@ import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -99,6 +98,7 @@ public class TestStandardProcessSession {
private ProvenanceEventRepository provenanceRepo; private ProvenanceEventRepository provenanceRepo;
private MockFlowFileRepository flowFileRepo; private MockFlowFileRepository flowFileRepo;
private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build(); private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
private static StandardResourceClaimManager resourceClaimManager;
@After @After
public void cleanup() { public void cleanup() {
@ -136,6 +136,8 @@ public class TestStandardProcessSession {
@Before @Before
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void setup() throws IOException { public void setup() throws IOException {
resourceClaimManager = new StandardResourceClaimManager();
System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties"); System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
final FlowFileEventRepository flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class); final FlowFileEventRepository flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class);
final CounterRepository counterRepo = Mockito.mock(CounterRepository.class); final CounterRepository counterRepo = Mockito.mock(CounterRepository.class);
@ -815,7 +817,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis()) .entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.size(1L) .size(1L)
.build(); .build();
flowFileQueue.put(flowFileRecord); flowFileQueue.put(flowFileRecord);
@ -963,7 +965,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis()) .entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.size(1L) .size(1L)
.build(); .build();
flowFileQueue.put(flowFileRecord); flowFileQueue.put(flowFileRecord);
@ -987,7 +989,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis()) .entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.build(); .build();
flowFileQueue.put(flowFileRecord); flowFileQueue.put(flowFileRecord);
@ -1003,7 +1005,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis()) .entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.contentClaimOffset(1000L) .contentClaimOffset(1000L)
.size(1000L) .size(1000L)
.build(); .build();
@ -1028,7 +1030,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis()) .entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.build(); .build();
flowFileQueue.put(flowFileRecord); flowFileQueue.put(flowFileRecord);
@ -1045,7 +1047,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis()) .entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.contentClaimOffset(1000L).size(1L).build(); .contentClaimOffset(1000L).size(1L).build();
flowFileQueue.put(flowFileRecord2); flowFileQueue.put(flowFileRecord2);
@ -1114,7 +1116,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis()) .entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.contentClaimOffset(0L).size(0L).build(); .contentClaimOffset(0L).size(0L).build();
flowFileQueue.put(flowFileRecord); flowFileQueue.put(flowFileRecord);
@ -1152,7 +1154,7 @@ public class TestStandardProcessSession {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis()) .entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L))
.contentClaimOffset(0L).size(0L).build(); .contentClaimOffset(0L).size(0L).build();
flowFileQueue.put(flowFileRecord); flowFileQueue.put(flowFileRecord);
@ -1479,7 +1481,7 @@ public class TestStandardProcessSession {
final Set<ContentClaim> claims = new HashSet<>(); final Set<ContentClaim> claims = new HashSet<>();
for (long i = 0; i < idGenerator.get(); i++) { for (long i = 0; i < idGenerator.get(); i++) {
final ResourceClaim resourceClaim = new StandardResourceClaim("container", "section", String.valueOf(i), false); final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim("container", "section", String.valueOf(i), false);
final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L); final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L);
if (getClaimantCount(contentClaim) > 0) { if (getClaimantCount(contentClaim) > 0) {
claims.add(contentClaim); claims.add(contentClaim);

View File

@ -44,7 +44,6 @@ import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.swap.StandardSwapContents; import org.apache.nifi.controller.swap.StandardSwapContents;
import org.apache.nifi.controller.swap.StandardSwapSummary; import org.apache.nifi.controller.swap.StandardSwapSummary;
@ -87,10 +86,10 @@ public class TestWriteAheadFlowFileRepository {
when(connection.getFlowFileQueue()).thenReturn(queue); when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection); queueProvider.addConnection(connection);
final ResourceClaim resourceClaim1 = new StandardResourceClaim("container", "section", "1", false); final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false);
final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L); final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L);
final ResourceClaim resourceClaim2 = new StandardResourceClaim("container", "section", "2", false); final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false);
final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L); final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L);
// Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then, // Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then,

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository.claim;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
public class TestStandardResourceClaimManager {
@Test
@Ignore("Unit test was created to repeat a concurrency bug in StandardResourceClaimManager. "
+ "However, now that the concurrency bug has been fixed, the test will deadlock. Leaving here for now in case it's valuable before the commit is pushed")
public void testIncrementAndDecrementThreadSafety() throws InterruptedException {
final AtomicBoolean waitToRemove = new AtomicBoolean(true);
final CountDownLatch decrementComplete = new CountDownLatch(1);
final StandardResourceClaimManager manager = new StandardResourceClaimManager() {
@Override
protected void removeClaimantCount(final ResourceClaim claim) {
decrementComplete.countDown();
while (waitToRemove.get()) {
try {
Thread.sleep(10L);
} catch (final InterruptedException ie) {
Assert.fail("Interrupted while waiting to remove claimant count");
}
}
super.removeClaimantCount(claim);
}
};
final ResourceClaim resourceClaim = manager.newResourceClaim("container", "section", "id", false);
assertEquals(1, manager.incrementClaimantCount(resourceClaim)); // increment claimant count to 1.
assertEquals(1, manager.getClaimantCount(resourceClaim));
// Decrement the claimant count. This should decrement the count to 0. However, we have 'waitToRemove' set to true,
// so the manager will not actually remove the claimant count (or return from this method) until we set 'waitToRemove'
// to false. We do this so that we can increment the claimant count in a separate thread. Because we will be incrementing
// the count in 1 thread and decrementing it in another thread, the end result should be that the claimant count is still
// at 1.
final Runnable decrementCountRunnable = new Runnable() {
@Override
public void run() {
manager.decrementClaimantCount(resourceClaim);
}
};
final Runnable incrementCountRunnable = new Runnable() {
@Override
public void run() {
// Wait until the count has been decremented
try {
decrementComplete.await();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
}
// Increment the claimant count
manager.incrementClaimantCount(resourceClaim);
// allow the 'decrement Thread' to complete
waitToRemove.set(false);
}
};
// Start the threads so that the claim count is incremented and decremented at the same time
final Thread decrementThread = new Thread(decrementCountRunnable);
final Thread incrementThread = new Thread(incrementCountRunnable);
decrementThread.start();
incrementThread.start();
// Wait for both threads to complete
incrementThread.join();
decrementThread.join();
// claimant count should still be 1, since 1 thread incremented it and 1 thread decremented it!
assertEquals(1, manager.getClaimantCount(resourceClaim));
}
}

View File

@ -33,4 +33,6 @@
<root level="INFO"> <root level="INFO">
<appender-ref ref="CONSOLE"/> <appender-ref ref="CONSOLE"/>
</root> </root>
<logger name="StandardProcessSession.claims" level="DEBUG" />
</configuration> </configuration>

View File

@ -16,8 +16,22 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.http.annotation.ThreadSafe; import org.apache.http.annotation.ThreadSafe;
import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
@ -25,24 +39,15 @@ import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@ThreadSafe() @ThreadSafe()
@EventDriven() @EventDriven()
@Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"}) @Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"})
@ -167,6 +172,20 @@ public class DebugFlow extends AbstractProcessor {
} }
}) })
.build(); .build();
static final PropertyDescriptor WRITE_ITERATIONS = new PropertyDescriptor.Builder()
.name("Write Iterations")
.description("Number of times to write to the FlowFile")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.required(true)
.defaultValue("0")
.build();
static final PropertyDescriptor CONTENT_SIZE = new PropertyDescriptor.Builder()
.name("Content Size")
.description("The number of bytes to write each time that the FlowFile is written to")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.required(true)
.defaultValue("1 KB")
.build();
private volatile Integer flowFileMaxSuccess = 0; private volatile Integer flowFileMaxSuccess = 0;
private volatile Integer flowFileMaxFailure = 0; private volatile Integer flowFileMaxFailure = 0;
@ -225,6 +244,8 @@ public class DebugFlow extends AbstractProcessor {
propList.add(NO_FF_EXCEPTION_ITERATIONS); propList.add(NO_FF_EXCEPTION_ITERATIONS);
propList.add(NO_FF_YIELD_ITERATIONS); propList.add(NO_FF_YIELD_ITERATIONS);
propList.add(NO_FF_EXCEPTION_CLASS); propList.add(NO_FF_EXCEPTION_CLASS);
propList.add(WRITE_ITERATIONS);
propList.add(CONTENT_SIZE);
propertyDescriptors.compareAndSet(null, Collections.unmodifiableList(propList)); propertyDescriptors.compareAndSet(null, Collections.unmodifiableList(propList));
} }
return propertyDescriptors.get(); return propertyDescriptors.get();
@ -304,6 +325,23 @@ public class DebugFlow extends AbstractProcessor {
} }
return; return;
} else { } else {
final int writeIterations = context.getProperty(WRITE_ITERATIONS).asInteger();
if (writeIterations > 0 && pass == 1) {
final Random random = new Random();
for (int i = 0; i < writeIterations; i++) {
final byte[] data = new byte[context.getProperty(CONTENT_SIZE).asDataSize(DataUnit.B).intValue()];
random.nextBytes(data);
ff = session.write(ff, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(data);
}
});
}
}
if (curr_ff_resp.state() == FlowFileResponseState.FF_SUCCESS_RESPONSE) { if (curr_ff_resp.state() == FlowFileResponseState.FF_SUCCESS_RESPONSE) {
if (flowFileCurrSuccess < flowFileMaxSuccess) { if (flowFileCurrSuccess < flowFileMaxSuccess) {
flowFileCurrSuccess += 1; flowFileCurrSuccess += 1;

View File

@ -76,16 +76,6 @@ public class TestDebugFlow {
runner.setProperty(DebugFlow.NO_FF_YIELD_ITERATIONS, "0"); runner.setProperty(DebugFlow.NO_FF_YIELD_ITERATIONS, "0");
} }
@Test
public void testGetSupportedPropertyDescriptors() throws Exception {
assertEquals(11, debugFlow.getPropertyDescriptors().size());
}
@Test
public void testGetRelationships() throws Exception {
assertEquals(2, debugFlow.getRelationships().size());
}
private boolean isInContents(byte[] content) { private boolean isInContents(byte[] content) {
for (Map.Entry entry : contents.entrySet()) { for (Map.Entry entry : contents.entrySet()) {
if (((String)entry.getValue()).compareTo(new String(content)) == 0) { if (((String)entry.getValue()).compareTo(new String(content)) == 0) {