diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java index 7d5b9c2935..737fbe3884 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java @@ -70,4 +70,9 @@ public interface DropFlowFileStatus { * @return the current state of the operation */ DropFlowFileState getState(); + + /** + * @return the reason that the state is set to a Failure state, or null if the state is not {@link DropFlowFileState#FAILURE}. + */ + String getFailureReason(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java index 410430874a..189fe7deab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java @@ -30,6 +30,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus { private volatile QueueSize droppedSize = new QueueSize(0, 0L); private volatile long lastUpdated = System.currentTimeMillis(); private volatile Thread executionThread; + private volatile String failureReason; private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK; @@ -85,8 +86,18 @@ public class DropFlowFileRequest implements DropFlowFileStatus { return lastUpdated; } + @Override + public String getFailureReason() { + return failureReason; + } + synchronized void setState(final DropFlowFileState state) { + setState(state, null); + } + + synchronized void setState(final DropFlowFileState state, final String explanation) { this.state = state; + this.failureReason = explanation; this.lastUpdated = System.currentTimeMillis(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 82c1c7e9f2..5b137f7f46 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -928,14 +928,34 @@ public final class StandardFlowFileQueue implements FlowFileQueue { try { final List activeQueueRecords = new ArrayList<>(activeQueue); - QueueSize droppedSize = drop(activeQueueRecords, requestor); + + QueueSize droppedSize; + try { + droppedSize = drop(activeQueueRecords, requestor); + } catch (final IOException ioe) { + logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); + logger.error("", ioe); + + dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString()); + return; + } + activeQueue.clear(); activeQueueContentSize = 0; activeQueueSizeRef.set(0); dropRequest.setCurrentSize(getQueueSize()); dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); - droppedSize = drop(swapQueue, requestor); + try { + droppedSize = drop(swapQueue, requestor); + } catch (final IOException ioe) { + logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); + logger.error("", ioe); + + dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString()); + return; + } + swapQueue.clear(); dropRequest.setCurrentSize(getQueueSize()); dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); @@ -946,12 +966,22 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final Iterator swapLocationItr = swapLocations.iterator(); while (swapLocationItr.hasNext()) { final String swapLocation = swapLocationItr.next(); - final List swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this); + + List swappedIn = null; try { + swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this); droppedSize = drop(swappedIn, requestor); - } catch (final Exception e) { - activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue. - throw e; + } catch (final IOException ioe) { + logger.error("Failed to swap in FlowFiles from Swap File {} in order to drop the FlowFiles for Connection {} due to {}", + swapLocation, StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); + logger.error("", ioe); + + dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles from Swap File " + swapLocation + " due to " + ioe.toString()); + if (swappedIn != null) { + activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue. + } + + return; } dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); @@ -963,8 +993,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { dropRequest.setState(DropFlowFileState.COMPLETE); } catch (final Exception e) { - // TODO: Handle adequately - dropRequest.setState(DropFlowFileState.FAILURE); + dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.toString()); } } finally { writeLock.unlock("Drop FlowFiles");