NIFI-730: Make cancel request actually cancel

This commit is contained in:
Mark Payne 2015-10-14 11:11:48 -04:00
parent 5867193bc1
commit a2ae99f899
2 changed files with 16 additions and 1 deletions

View File

@ -77,7 +77,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
} }
@Override @Override
public DropFlowFileState getState() { public synchronized DropFlowFileState getState() {
return state; return state;
} }

View File

@ -952,6 +952,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
QueueSize droppedSize; QueueSize droppedSize;
try { try {
if (dropRequest.getState() == DropFlowFileState.CANCELED) {
logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
return;
}
droppedSize = drop(activeQueueRecords, requestor); droppedSize = drop(activeQueueRecords, requestor);
logger.debug("For DropFlowFileRequest {}, Dropped {} from active queue", requestIdentifier, droppedSize); logger.debug("For DropFlowFileRequest {}, Dropped {} from active queue", requestIdentifier, droppedSize);
} catch (final IOException ioe) { } catch (final IOException ioe) {
@ -972,6 +977,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}", logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}",
requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount()); requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount());
if (dropRequest.getState() == DropFlowFileState.CANCELED) {
logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
return;
}
droppedSize = drop(swapQueue, requestor); droppedSize = drop(swapQueue, requestor);
} catch (final IOException ioe) { } catch (final IOException ioe) {
logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
@ -995,6 +1005,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
List<FlowFileRecord> swappedIn = null; List<FlowFileRecord> swappedIn = null;
try { try {
if (dropRequest.getState() == DropFlowFileState.CANCELED) {
logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
return;
}
swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this); swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
droppedSize = drop(swappedIn, requestor); droppedSize = drop(swappedIn, requestor);
} catch (final IOException ioe) { } catch (final IOException ioe) {