From 72ff2a25d5455442d5aec27835f953e6ab36eca3 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 13 Oct 2015 15:46:56 -0400 Subject: [PATCH] NIFI-730: Updated queue sizes appropriately during dropping of flowfiles --- .../apache/nifi/controller/StandardFlowFileQueue.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index b699ceb266..82c1c7e9f2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -924,11 +924,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue { writeLock.lock(); try { dropRequest.setState(DropFlowFileState.DROPPING_ACTIVE_FLOWFILES); + dropRequest.setOriginalSize(getQueueSize()); try { final List activeQueueRecords = new ArrayList<>(activeQueue); QueueSize droppedSize = drop(activeQueueRecords, requestor); activeQueue.clear(); + activeQueueContentSize = 0; + activeQueueSizeRef.set(0); dropRequest.setCurrentSize(getQueueSize()); dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); @@ -936,6 +939,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue { swapQueue.clear(); dropRequest.setCurrentSize(getQueueSize()); dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); + swapMode = false; + swappedContentSize -= droppedSize.getByteCount(); + swappedRecordCount -= droppedSize.getObjectCount(); final Iterator swapLocationItr = swapLocations.iterator(); while (swapLocationItr.hasNext()) { @@ -943,12 +949,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final List swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this); try { droppedSize = drop(swappedIn, requestor); - dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); } catch (final Exception e) { activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue. throw e; } + dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); + swappedContentSize -= droppedSize.getByteCount(); + swappedRecordCount -= droppedSize.getObjectCount(); dropRequest.setCurrentSize(getQueueSize()); swapLocationItr.remove(); }