NIFI-41: Don't allow destination fo connection to change if current destination is holding its FlowFiles

This commit is contained in:
Mark Payne 2015-06-05 08:49:01 -04:00
parent 66f3b7e30f
commit 8d1536ed24
3 changed files with 22 additions and 4 deletions

View File

@ -100,6 +100,15 @@ public interface FlowFileQueue {
QueueSize getActiveQueueSize(); QueueSize getActiveQueueSize();
/**
* Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile
* is considered to be unacknowledged if it has been pulled from the queue by some component
* but the session that pulled the FlowFile has not yet been committed or rolled back.
*
* @return a QueueSize that represents all FlowFiles that are 'unacknowledged'.
*/
QueueSize getUnacknowledgedQueueSize();
void acknowledge(FlowFileRecord flowFile); void acknowledge(FlowFileRecord flowFile);
void acknowledge(Collection<FlowFileRecord> flowFiles); void acknowledge(Collection<FlowFileRecord> flowFiles);

View File

@ -229,7 +229,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} }
return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount() + preFetchCount, return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount() + preFetchCount,
activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount() + preFetchSize); activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount() + preFetchSize);
} }
@Override @Override
@ -526,9 +526,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final QueueSize unacknowledged = unacknowledgedSizeRef.get(); final QueueSize unacknowledged = unacknowledgedSizeRef.get();
logger.debug("Total Queue Size: ActiveQueue={}/{} MB, Swap Queue={}/{} MB, Unacknowledged={}/{} MB", logger.debug("Total Queue Size: ActiveQueue={}/{} MB, Swap Queue={}/{} MB, Unacknowledged={}/{} MB",
activeQueue.size(), activeQueueContentSize / byteToMbDivisor, activeQueue.size(), activeQueueContentSize / byteToMbDivisor,
swappedRecordCount, swappedContentSize / byteToMbDivisor, swappedRecordCount, swappedContentSize / byteToMbDivisor,
unacknowledged.getObjectCount(), unacknowledged.getByteCount() / byteToMbDivisor); unacknowledged.getObjectCount(), unacknowledged.getByteCount() / byteToMbDivisor);
} }
return swapQueue.size(); return swapQueue.size();
@ -961,6 +961,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
writeLock.unlock("external unlock"); writeLock.unlock("external unlock");
} }
@Override
public QueueSize getUnacknowledgedQueueSize() {
return unacknowledgedSizeRef.get();
}
private void updateUnacknowledgedSize(final int addToCount, final long addToSize) { private void updateUnacknowledgedSize(final int addToCount, final long addToSize) {
boolean updated = false; boolean updated = false;

View File

@ -181,6 +181,10 @@ public final class StandardConnection implements Connection {
throw new IllegalStateException("Cannot change destination of Connection because the current destination is running"); throw new IllegalStateException("Cannot change destination of Connection because the current destination is running");
} }
if (getFlowFileQueue().getUnacknowledgedQueueSize().getObjectCount() > 0) {
throw new IllegalStateException("Cannot change destination of Connection because FlowFiles from this Connection are currently held by " + previousDestination);
}
try { try {
previousDestination.removeConnection(this); previousDestination.removeConnection(this);
this.destination.set(newDestination); this.destination.set(newDestination);