From a2ae99f89965a3fe1bd6591204bdb187a377ae1c Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 14 Oct 2015 11:11:48 -0400 Subject: [PATCH] NIFI-730: Make cancel request actually cancel --- .../nifi/controller/DropFlowFileRequest.java | 2 +- .../nifi/controller/StandardFlowFileQueue.java | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java index 58695c2cfb..7eea86ad09 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java @@ -77,7 +77,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus { } @Override - public DropFlowFileState getState() { + public synchronized DropFlowFileState getState() { return state; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 8085760b0b..062c4248e1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -952,6 +952,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue { QueueSize droppedSize; try { + if (dropRequest.getState() == DropFlowFileState.CANCELED) { + logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier); + return; + } + droppedSize = drop(activeQueueRecords, requestor); logger.debug("For DropFlowFileRequest {}, Dropped {} from active queue", requestIdentifier, droppedSize); } catch (final IOException ioe) { @@ -972,6 +977,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue { logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}", requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount()); + if (dropRequest.getState() == DropFlowFileState.CANCELED) { + logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier); + return; + } + droppedSize = drop(swapQueue, requestor); } catch (final IOException ioe) { logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); @@ -995,6 +1005,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue { List swappedIn = null; try { + if (dropRequest.getState() == DropFlowFileState.CANCELED) { + logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier); + return; + } + swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this); droppedSize = drop(swappedIn, requestor); } catch (final IOException ioe) {