This commit is contained in:
Matt Gilman 2015-06-05 23:38:59 -04:00
commit 245fef4ee2
3 changed files with 22 additions and 4 deletions

View File

@ -100,6 +100,15 @@ public interface FlowFileQueue {
QueueSize getActiveQueueSize();
/**
* Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile
* is considered to be unacknowledged if it has been pulled from the queue by some component
* but the session that pulled the FlowFile has not yet been committed or rolled back.
*
* @return a QueueSize that represents all FlowFiles that are 'unacknowledged'.
*/
QueueSize getUnacknowledgedQueueSize();
void acknowledge(FlowFileRecord flowFile);
void acknowledge(Collection<FlowFileRecord> flowFiles);

View File

@ -961,6 +961,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
writeLock.unlock("external unlock");
}
@Override
public QueueSize getUnacknowledgedQueueSize() {
return unacknowledgedSizeRef.get();
}
private void updateUnacknowledgedSize(final int addToCount, final long addToSize) {
boolean updated = false;

View File

@ -181,6 +181,10 @@ public final class StandardConnection implements Connection {
throw new IllegalStateException("Cannot change destination of Connection because the current destination is running");
}
if (getFlowFileQueue().getUnacknowledgedQueueSize().getObjectCount() > 0) {
throw new IllegalStateException("Cannot change destination of Connection because FlowFiles from this Connection are currently held by " + previousDestination);
}
try {
previousDestination.removeConnection(this);
this.destination.set(newDestination);