diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java index 92a4ee0301..e1baeb7558 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java @@ -100,6 +100,15 @@ public interface FlowFileQueue { QueueSize getActiveQueueSize(); + /** + * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile + * is considered to be unacknowledged if it has been pulled from the queue by some component + * but the session that pulled the FlowFile has not yet been committed or rolled back. + * + * @return a QueueSize that represents all FlowFiles that are 'unacknowledged'. + */ + QueueSize getUnacknowledgedQueueSize(); + void acknowledge(FlowFileRecord flowFile); void acknowledge(Collection flowFiles); diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 075f2cf5f3..8f6c8ed944 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -229,7 +229,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount() + preFetchCount, - activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount() + preFetchSize); + activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount() + preFetchSize); } @Override @@ -526,9 +526,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final QueueSize unacknowledged = unacknowledgedSizeRef.get(); logger.debug("Total Queue Size: ActiveQueue={}/{} MB, Swap Queue={}/{} MB, Unacknowledged={}/{} MB", - activeQueue.size(), activeQueueContentSize / byteToMbDivisor, - swappedRecordCount, swappedContentSize / byteToMbDivisor, - unacknowledged.getObjectCount(), unacknowledged.getByteCount() / byteToMbDivisor); + activeQueue.size(), activeQueueContentSize / byteToMbDivisor, + swappedRecordCount, swappedContentSize / byteToMbDivisor, + unacknowledged.getObjectCount(), unacknowledged.getByteCount() / byteToMbDivisor); } return swapQueue.size(); @@ -961,6 +961,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue { writeLock.unlock("external unlock"); } + @Override + public QueueSize getUnacknowledgedQueueSize() { + return unacknowledgedSizeRef.get(); + } + private void updateUnacknowledgedSize(final int addToCount, final long addToSize) { boolean updated = false; diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index 86c9320ff2..ad556e2c50 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -181,6 +181,10 @@ public final class StandardConnection implements Connection { throw new IllegalStateException("Cannot change destination of Connection because the current destination is running"); } + if (getFlowFileQueue().getUnacknowledgedQueueSize().getObjectCount() > 0) { + throw new IllegalStateException("Cannot change destination of Connection because FlowFiles from this Connection are currently held by " + previousDestination); + } + try { previousDestination.removeConnection(this); this.destination.set(newDestination);