NIFI-730: Updated queue sizes appropriately during dropping of flowfiles

This commit is contained in:
Mark Payne 2015-10-13 15:46:56 -04:00
parent 4b41aaab02
commit 72ff2a25d5
1 changed files with 9 additions and 1 deletions

View File

@ -924,11 +924,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
writeLock.lock();
try {
dropRequest.setState(DropFlowFileState.DROPPING_ACTIVE_FLOWFILES);
dropRequest.setOriginalSize(getQueueSize());
try {
final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
QueueSize droppedSize = drop(activeQueueRecords, requestor);
activeQueue.clear();
activeQueueContentSize = 0;
activeQueueSizeRef.set(0);
dropRequest.setCurrentSize(getQueueSize());
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
@ -936,6 +939,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
swapQueue.clear();
dropRequest.setCurrentSize(getQueueSize());
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
swapMode = false;
swappedContentSize -= droppedSize.getByteCount();
swappedRecordCount -= droppedSize.getObjectCount();
final Iterator<String> swapLocationItr = swapLocations.iterator();
while (swapLocationItr.hasNext()) {
@ -943,12 +949,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
try {
droppedSize = drop(swappedIn, requestor);
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
} catch (final Exception e) {
activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
throw e;
}
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
swappedContentSize -= droppedSize.getByteCount();
swappedRecordCount -= droppedSize.getObjectCount();
dropRequest.setCurrentSize(getQueueSize());
swapLocationItr.remove();
}