From 978f483ba84b323674fc0134660bed2ee30da079 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Thu, 22 Dec 2016 15:48:45 -0500 Subject: [PATCH] NIFI-3250 Fixing logic in StandardFlowFileQueue when migrating flow files to the active queue This closes #3250. Signed-off-by: Aldrin Piri --- .../controller/StandardFlowFileQueue.java | 3 +- .../controller/TestStandardFlowFileQueue.java | 65 +++++++++++++++++-- 2 files changed, 62 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index e20e250524..ba01338fe6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -448,7 +448,8 @@ public class StandardFlowFileQueue implements FlowFileQueue { // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out // to disk, because we want them to be swapped back in in the same order that they were swapped out. - if (activeQueue.size() > swapThreshold - SWAP_RECORD_POLL_SIZE) { + final int activeQueueSize = activeQueue.size(); + if (activeQueueSize > 0 && activeQueueSize > swapThreshold - SWAP_RECORD_POLL_SIZE) { return; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 3960d8d4ec..fc10d00eea 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -74,6 +74,12 @@ public class TestStandardFlowFileQueue { private TestSwapManager swapManager = null; private StandardFlowFileQueue queue = null; + private Connection connection = null; + private FlowFileRepository flowFileRepo = null; + private ProvenanceEventRepository provRepo = null; + private ResourceClaimManager claimManager = null; + private ProcessScheduler scheduler = null; + private List provRecords = new ArrayList<>(); @BeforeClass @@ -86,16 +92,16 @@ public class TestStandardFlowFileQueue { public void setup() { provRecords.clear(); - final Connection connection = Mockito.mock(Connection.class); + connection = Mockito.mock(Connection.class); Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class)); Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class)); - final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class); + scheduler = Mockito.mock(ProcessScheduler.class); swapManager = new TestSwapManager(); - final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class); - final ProvenanceEventRepository provRepo = Mockito.mock(ProvenanceEventRepository.class); - final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class); + flowFileRepo = Mockito.mock(FlowFileRepository.class); + provRepo = Mockito.mock(ProvenanceEventRepository.class); + claimManager = Mockito.mock(ResourceClaimManager.class); Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder()); Mockito.doAnswer(new Answer() { @@ -382,6 +388,55 @@ public class TestStandardFlowFileQueue { queue.poll(exp); } + @Test + public void testSwapInWhenThresholdIsLessThanSwapSize() { + // create a queue where the swap threshold is less than 10k + queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000); + + for (int i = 1; i <= 20000; i++) { + queue.put(new TestFlowFile()); + } + + assertEquals(1, swapManager.swappedOut.size()); + queue.put(new TestFlowFile()); + assertEquals(1, swapManager.swappedOut.size()); + + final Set exp = new HashSet<>(); + + // At this point there should be: + // 1k flow files in the active queue + // 9,001 flow files in the swap queue + // 10k flow files swapped to disk + + for (int i = 0; i < 999; i++) { // + final FlowFileRecord flowFile = queue.poll(exp); + assertNotNull(flowFile); + assertEquals(1, queue.getUnacknowledgedQueueSize().getObjectCount()); + assertEquals(1, queue.getUnacknowledgedQueueSize().getByteCount()); + + queue.acknowledge(Collections.singleton(flowFile)); + assertEquals(0, queue.getUnacknowledgedQueueSize().getObjectCount()); + assertEquals(0, queue.getUnacknowledgedQueueSize().getByteCount()); + } + + assertEquals(0, swapManager.swapInCalledCount); + assertEquals(1, queue.getActiveQueueSize().getObjectCount()); + assertNotNull(queue.poll(exp)); + + assertEquals(0, swapManager.swapInCalledCount); + assertEquals(0, queue.getActiveQueueSize().getObjectCount()); + + assertEquals(1, swapManager.swapOutCalledCount); + + assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top. + assertEquals(1, swapManager.swapInCalledCount); + assertEquals(9999, queue.getActiveQueueSize().getObjectCount()); + + assertTrue(swapManager.swappedOut.isEmpty()); + + queue.poll(exp); + } + @Test public void testQueueCountsUpdatedWhenIncompleteSwapFile() { for (int i = 1; i <= 20000; i++) {