diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index ba01338fe6..3d378b47c5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -459,21 +459,25 @@ public class StandardFlowFileQueue implements FlowFileQueue { // keep up with queue), we will end up always processing the new FlowFiles first instead of the FlowFiles that arrived // first. if (!swapLocations.isEmpty()) { - final String swapLocation = swapLocations.remove(0); + final String swapLocation = swapLocations.get(0); boolean partialContents = false; SwapContents swapContents = null; try { swapContents = swapManager.swapIn(swapLocation, this); + swapLocations.remove(0); } catch (final IncompleteSwapFileException isfe) { logger.error("Failed to swap in all FlowFiles from Swap File {}; Swap File ended prematurely. The records that were present will still be swapped in", swapLocation); logger.error("", isfe); swapContents = isfe.getPartialContents(); partialContents = true; + swapLocations.remove(0); } catch (final FileNotFoundException fnfe) { logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", swapLocation); if (eventReporter != null) { eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the Swap File can no longer be found"); } + + swapLocations.remove(0); return; } catch (final IOException ioe) { logger.error("Failed to swap in FlowFiles from Swap File {}; Swap File appears to be corrupt!", swapLocation); @@ -482,7 +486,17 @@ public class StandardFlowFileQueue implements FlowFileQueue { eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + swapLocation + "; Swap File appears to be corrupt! Some FlowFiles in the queue may not be accessible. See logs for more information."); } + + // We do not remove the Swap File from swapLocations because the IOException may be recoverable later. For instance, the file may be on a network + // drive and we may have connectivity problems, etc. return; + } catch (final Throwable t) { + logger.error("Failed to swap in FlowFiles from Swap File {}", swapLocation, t); + + // We do not remove the Swap File from swapLocations because this is an unexpected failure that may be retry-able. For example, if there were + // an OOME, etc. then we don't want to he queue to still reflect that the data is around but never swap it in. By leaving the Swap File + // in swapLocations, we will continue to retry. + throw t; } final QueueSize swapSize = swapContents.getSummary().getQueueSize(); @@ -516,7 +530,7 @@ public class StandardFlowFileQueue implements FlowFileQueue { if (size.get().swappedCount > swapQueue.size()) { // we already have FlowFiles swapped out, so we won't migrate the queue; we will wait for - // an external process to swap FlowFiles back in. + // the files to be swapped back in first return; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index fc10d00eea..9d863d056f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -63,6 +63,7 @@ import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -583,6 +584,62 @@ public class TestStandardFlowFileQueue { } + @Test + public void testOOMEFollowedBySuccessfulSwapIn() { + final List flowFiles = new ArrayList<>(); + for (int i = 0; i < 50000; i++) { + flowFiles.add(new TestFlowFile()); + } + + queue.putAll(flowFiles); + + swapManager.failSwapInAfterN = 2; + swapManager.setSwapInFailure(new OutOfMemoryError("Intentional OOME for unit test")); + + final Set expiredRecords = new HashSet<>(); + for (int i = 0; i < 30000; i++) { + final FlowFileRecord polled = queue.poll(expiredRecords); + assertNotNull(polled); + } + + // verify that unexpected ERROR's are handled in such a way that we keep retrying + for (int i = 0; i < 3; i++) { + try { + queue.poll(expiredRecords); + Assert.fail("Expected OOME to be thrown"); + } catch (final OutOfMemoryError oome) { + // expected + } + } + + // verify that unexpected Runtime Exceptions are handled in such a way that we keep retrying + swapManager.setSwapInFailure(new NullPointerException("Intentional OOME for unit test")); + + for (int i = 0; i < 3; i++) { + try { + queue.poll(expiredRecords); + Assert.fail("Expected NPE to be thrown"); + } catch (final NullPointerException npe) { + // expected + } + } + + swapManager.failSwapInAfterN = -1; + + for (int i = 0; i < 20000; i++) { + final FlowFileRecord polled = queue.poll(expiredRecords); + assertNotNull(polled); + } + + queue.acknowledge(flowFiles); + assertNull(queue.poll(expiredRecords)); + assertEquals(0, queue.getActiveQueueSize().getObjectCount()); + assertEquals(0, queue.size().getObjectCount()); + + assertTrue(swapManager.swappedOut.isEmpty()); + } + + private class TestSwapManager implements FlowFileSwapManager { private final Map> swappedOut = new HashMap<>(); int swapOutCalledCount = 0; @@ -590,6 +647,13 @@ public class TestStandardFlowFileQueue { private int incompleteSwapFileRecordsToInclude = -1; + private int failSwapInAfterN = -1; + private Throwable failSwapInFailure = null; + + private void setSwapInFailure(final Throwable t) { + this.failSwapInFailure = t; + } + @Override public void initialize(final SwapManagerInitializationContext initializationContext) { @@ -622,6 +686,17 @@ public class TestStandardFlowFileQueue { final SwapContents partialContents = new StandardSwapContents(summary, partial); throw new IncompleteSwapFileException(swapLocation, partialContents); } + + if (swapInCalledCount > failSwapInAfterN && failSwapInAfterN > -1) { + if (failSwapInFailure instanceof RuntimeException) { + throw (RuntimeException) failSwapInFailure; + } + if (failSwapInFailure instanceof Error) { + throw (Error) failSwapInFailure; + } + + throw new RuntimeException(failSwapInFailure); + } } @Override