diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java index b81bd3fc02..c44283841b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java @@ -162,6 +162,9 @@ public class SwappablePriorityQueue { } migrateSwapToActive(); + if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) { + return; + } final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE; @@ -171,10 +174,11 @@ public class SwappablePriorityQueue { originalSwapQueueBytes += flowFile.getSize(); } - // Create a new Priority queue with the prioritizers that are set, but reverse the - // prioritizers because we want to pull the lowest-priority FlowFiles to swap out - final PriorityQueue tempQueue = new PriorityQueue<>(activeQueue.size() + swapQueue.size(), Collections.reverseOrder(new QueuePrioritizer(getPriorities()))); - tempQueue.addAll(activeQueue); + // Create a new Priority queue with the same prioritizers that are set for this queue. We want to swap out the highest priority data first, because + // whatever data we don't write out to a swap file (because there isn't enough to fill a swap file) will be added back to the swap queue. + // Since the swap queue cannot be processed until all swap files, we want to ensure that only the lowest priority data goes back onto it. Which means + // that we must swap out the highest priority data that is currently on the swap queue. + final PriorityQueue tempQueue = new PriorityQueue<>(swapQueue.size(), new QueuePrioritizer(getPriorities())); tempQueue.addAll(swapQueue); long bytesSwappedOut = 0L; @@ -221,23 +225,14 @@ public class SwappablePriorityQueue { // swap queue. Then add the records back to the active queue. swapQueue.clear(); long updatedSwapQueueBytes = 0L; - while (tempQueue.size() > swapThreshold) { - final FlowFileRecord record = tempQueue.poll(); + FlowFileRecord record; + while ((record = tempQueue.poll()) != null) { swapQueue.add(record); updatedSwapQueueBytes += record.getSize(); } Collections.reverse(swapQueue); // currently ordered in reverse priority order based on the ordering of the temp queue - // replace the contents of the active queue, since we've merged it with the swap queue. - activeQueue.clear(); - FlowFileRecord toRequeue; - long activeQueueBytes = 0L; - while ((toRequeue = tempQueue.poll()) != null) { - activeQueue.offer(toRequeue); - activeQueueBytes += toRequeue.getSize(); - } - boolean updated = false; while (!updated) { final FlowFileQueueSize originalSize = getFlowFileQueueSize(); @@ -245,13 +240,13 @@ public class SwappablePriorityQueue { final int addedSwapRecords = swapQueue.size() - originalSwapQueueCount; final long addedSwapBytes = updatedSwapQueueBytes - originalSwapQueueBytes; - final FlowFileQueueSize newSize = new FlowFileQueueSize(activeQueue.size(), activeQueueBytes, + final FlowFileQueueSize newSize = new FlowFileQueueSize(originalSize.getActiveCount(), originalSize.getActiveBytes(), originalSize.getSwappedCount() + addedSwapRecords + flowFilesSwappedOut, originalSize.getSwappedBytes() + addedSwapBytes + bytesSwappedOut, originalSize.getSwapFileCount() + numSwapFiles, originalSize.getUnacknowledgedCount(), originalSize.getUnacknowledgedBytes()); - updated = updateSize(originalSize, newSize); + updated = updateSize(originalSize, newSize); if (updated) { logIfNegative(originalSize, newSize, "swap"); } @@ -286,9 +281,7 @@ public class SwappablePriorityQueue { // Calling this method when records are polled prevents this condition by migrating FlowFiles from the // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out // to disk, because we want them to be swapped back in in the same order that they were swapped out. - - final int activeQueueSize = activeQueue.size(); - if (activeQueueSize > 0 && activeQueueSize > swapThreshold - SWAP_RECORD_POLL_SIZE) { + if (!activeQueue.isEmpty()) { return; } @@ -315,20 +308,33 @@ public class SwappablePriorityQueue { return; } + // Swap Queue is not currently ordered. We want to migrate the highest priority FlowFiles to the Active Queue, then re-queue the lowest priority items. + final PriorityQueue tempQueue = new PriorityQueue<>(swapQueue.size(), new QueuePrioritizer(getPriorities())); + tempQueue.addAll(swapQueue); + int recordsMigrated = 0; long bytesMigrated = 0L; - final Iterator swapItr = swapQueue.iterator(); - while (activeQueue.size() < swapThreshold && swapItr.hasNext()) { - final FlowFileRecord toMigrate = swapItr.next(); + while (activeQueue.size() < swapThreshold) { + final FlowFileRecord toMigrate = tempQueue.poll(); + if (toMigrate == null) { + break; + } + activeQueue.add(toMigrate); bytesMigrated += toMigrate.getSize(); recordsMigrated++; - swapItr.remove(); + } + + swapQueue.clear(); + FlowFileRecord toRequeue; + while ((toRequeue = tempQueue.poll()) != null) { + swapQueue.add(toRequeue); } if (recordsMigrated > 0) { incrementActiveQueueSize(recordsMigrated, bytesMigrated); incrementSwapQueueSize(-recordsMigrated, -bytesMigrated, 0); + logger.debug("Migrated {} FlowFiles from swap queue to active queue for {}", recordsMigrated, this); } if (size.getSwappedCount() == 0) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index da7a6ee0f0..089ac90daa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -56,7 +56,6 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.stream.io.ByteCountingInputStream; import org.apache.nifi.stream.io.ByteCountingOutputStream; import org.apache.nifi.stream.io.StreamUtils; -import org.rocksdb.Checkpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index b454d2dece..4cb28a25cf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -314,28 +314,6 @@ public class TestStandardFlowFileQueue { assertEquals(10000, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount()); } - @Test - public void testLowestPrioritySwappedOutFirst() { - final List prioritizers = new ArrayList<>(); - prioritizers.add(new FlowFileSizePrioritizer()); - queue.setPriorities(prioritizers); - - long maxSize = 20000; - for (int i = 1; i <= 20000; i++) { - queue.put(new MockFlowFileRecord(maxSize - i)); - } - - assertEquals(1, swapManager.swapOutCalledCount); - assertEquals(20000, queue.size().getObjectCount()); - - assertEquals(10000, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount()); - final List flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet()); - assertEquals(10000, flowFiles.size()); - for (int i = 0; i < 10000; i++) { - assertEquals(i, flowFiles.get(i).getSize()); - } - } - @Test public void testSwapIn() { for (int i = 1; i <= 20000; i++) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java index ef1a06353d..79be6ed182 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java @@ -26,9 +26,11 @@ import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.queue.SwappablePriorityQueue; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.StringUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -78,6 +80,193 @@ public class TestSwappablePriorityQueue { queue = new SwappablePriorityQueue(swapManager, 10000, eventReporter, flowFileQueue, dropAction, "local"); } + @Test + public void testPrioritizersBigQueue() { + final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() { + @Override + public int compare(final FlowFile o1, final FlowFile o2) { + final int i1 = Integer.parseInt(o1.getAttribute("i")); + final int i2 = Integer.parseInt(o2.getAttribute("i")); + return Integer.compare(i1, i2); + } + }; + + queue.setPriorities(Collections.singletonList(iAttributePrioritizer)); + final int iterations = 29000; + + for (int i=0; i < iterations; i++) { + final MockFlowFile flowFile = new MockFlowFile(i); + flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i))); + queue.put(flowFile); + } + + for (int i=0; i < iterations; i++) { + final MockFlowFile flowFile = new MockFlowFile(i + iterations); + flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i + iterations))); + + final FlowFileRecord polled = queue.poll(Collections.emptySet(), 0L); + assertEquals(polled.getAttribute("i"), String.valueOf(i)); + + queue.put(flowFile); + } + + // Make sure that the data is pulled from the queue and added back a couple of times. + // This will trigger swapping to occur, but also leave a lot of data in memory on the queue. + // This specifically tests the edge case where data is swapped out, and we want to make sure that + // when we read from the queue, that we swap the data back in before processing anything on the + // pending 'swap queue' internally. + repopulateQueue(); + repopulateQueue(); + + int i=iterations; + FlowFileRecord flowFile; + while ((flowFile = queue.poll(Collections.emptySet(), 0)) != null) { + assertEquals(String.valueOf(i), flowFile.getAttribute("i")); + i++; + } + } + + + @Test + public void testOrderingWithCornerCases() { + final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() { + @Override + public int compare(final FlowFile o1, final FlowFile o2) { + final int i1 = Integer.parseInt(o1.getAttribute("i")); + final int i2 = Integer.parseInt(o2.getAttribute("i")); + return Integer.compare(i1, i2); + } + }; + + queue.setPriorities(Collections.singletonList(iAttributePrioritizer)); + + for (final int queueSize : new int[] {1, 9999, 10_000, 10_001, 19_999, 20_000, 20_001}) { + System.out.println("Queue Size: " + queueSize); + + for (int i=0; i < queueSize; i++) { + final MockFlowFile flowFile = new MockFlowFile(i); + flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i))); + queue.put(flowFile); + } + + for (int i=0; i < queueSize; i++) { + final FlowFileRecord flowFile = queue.poll(Collections.emptySet(), 0); + assertEquals(String.valueOf(i), flowFile.getAttribute("i")); + } + + assertNull(queue.poll(Collections.emptySet(), 0)); + } + } + + @Test + public void testPrioritizerWhenOutOfOrderDataEntersSwapQueue() { + final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() { + @Override + public int compare(final FlowFile o1, final FlowFile o2) { + final int i1 = Integer.parseInt(o1.getAttribute("i")); + final int i2 = Integer.parseInt(o2.getAttribute("i")); + return Integer.compare(i1, i2); + } + }; + + queue.setPriorities(Collections.singletonList(iAttributePrioritizer)); + + // Add 10,000 FlowFiles to the queue. These will all go to the active queue. + final int iterations = 10000; + for (int i=0; i < iterations; i++) { + final MockFlowFile flowFile = new MockFlowFile(i); + flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i))); + queue.put(flowFile); + } + + // Added 3 FlowFiles to the queue. These will all go to the Swap Queue. + for (final String iValue : new String[] {"10000", "-5", "8000"}) { + final MockFlowFile swapQueueFlowFile1 = new MockFlowFile(10_000); + swapQueueFlowFile1.putAttributes(Collections.singletonMap("i", iValue)); + queue.put(swapQueueFlowFile1); + } + + // The first 10,000 should be ordered. Then all FlowFiles on the swap queue should be transferred over, as a single unit, just as they would be in a swap file. + for (int i=0; i < iterations; i++) { + final FlowFileRecord flowFile = queue.poll(Collections.emptySet(), 0); + assertEquals(String.valueOf(i), flowFile.getAttribute("i")); + } + + for (final String iValue : new String[] {"-5", "8000", "10000"}) { + final FlowFileRecord flowFile = queue.poll(Collections.emptySet(), 0); + assertEquals(iValue, flowFile.getAttribute("i")); + } + } + + @Test + public void testPrioritizersDataAddedAfterSwapOccurs() { + final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() { + @Override + public int compare(final FlowFile o1, final FlowFile o2) { + final int i1 = Integer.parseInt(o1.getAttribute("i")); + final int i2 = Integer.parseInt(o2.getAttribute("i")); + return Integer.compare(i1, i2); + } + }; + + queue.setPriorities(Collections.singletonList(iAttributePrioritizer)); + final int iterations = 29000; + + for (int i=0; i < iterations; i++) { + final MockFlowFile flowFile = new MockFlowFile(i); + flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i))); + queue.put(flowFile); + } + + for (int i=0; i < iterations; i++) { + final MockFlowFile flowFile = new MockFlowFile(i + iterations); + flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i + iterations))); + + final FlowFileRecord polled = queue.poll(Collections.emptySet(), 0L); + assertEquals(polled.getAttribute("i"), String.valueOf(i)); + + queue.put(flowFile); + } + + // Make sure that the data is pulled from the queue and added back a couple of times. + // This will trigger swapping to occur, but also leave a lot of data in memory on the queue. + // This specifically tests the edge case where data is swapped out, and we want to make sure that + // when we read from the queue, that we swap the data back in before processing anything on the + // pending 'swap queue' internally. + repopulateQueue(); + repopulateQueue(); + + // Add enough data for another swap file to get created. + final int baseI = iterations * 2; + for (int i=0; i < 10_000; i++) { + final MockFlowFile flowFile = new MockFlowFile(i); + flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(baseI + i))); + queue.put(flowFile); + } + + repopulateQueue(); + + int i=iterations; + FlowFileRecord flowFile; + while ((flowFile = queue.poll(Collections.emptySet(), 0)) != null) { + assertEquals(String.valueOf(i), flowFile.getAttribute("i")); + i++; + } + } + + private void repopulateQueue() { + final List attrs = new ArrayList<>(); + final List ffs = new ArrayList<>(); + FlowFileRecord ff; + while ((ff = queue.poll(Collections.emptySet(), 0L)) != null) { + ffs.add(ff); + attrs.add(ff.getAttribute("i")); + } + + ffs.forEach(queue::put); + System.out.println(StringUtils.join(attrs, ", ")); + } + @Test public void testSwapOutFailureLeavesCorrectQueueSize() { @@ -241,13 +430,45 @@ public class TestSwappablePriorityQueue { assertEquals(20000, queue.size().getObjectCount()); assertEquals(10000, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount()); - final List flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet(), 500000); - assertEquals(10000, flowFiles.size()); + + // The first 10,000 FlowFiles to be added to the queue will be sorted by size (first 10,000 because that's the swap threshold, by size because of the prioritizer). + // The next 10,000 spill over to the swap queue. So we expect the first 10,000 FlowFiles to be size 10,000 to 20,000. Then the next 10,000 to be sized 0 to 9,999. + final List firstBatch = queue.poll(Integer.MAX_VALUE, Collections.emptySet(), 0); + assertEquals(10000, firstBatch.size()); for (int i = 0; i < 10000; i++) { - assertEquals(i, flowFiles.get(i).getSize()); + assertEquals(10_000 + i, firstBatch.get(i).getSize()); } + + final List secondBatch = queue.poll(Integer.MAX_VALUE, Collections.emptySet(), 0); + assertEquals(10000, secondBatch.size()); + for (int i = 0; i < 10000; i++) { + assertEquals(i, secondBatch.get(i).getSize()); + } + } + @Test + public void testPrioritiesKeptIntactBeforeSwap() { + final List prioritizers = new ArrayList<>(); + prioritizers.add((o1, o2) -> Long.compare(o1.getSize(), o2.getSize())); + queue.setPriorities(prioritizers); + + int maxSize = 9999; + for (int i = 1; i <= maxSize; i++) { + queue.put(new MockFlowFileRecord(maxSize - i)); + } + + assertEquals(0, swapManager.swapOutCalledCount); + assertEquals(maxSize, queue.size().getObjectCount()); + + assertEquals(9999, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount()); + + FlowFileRecord flowFile; + int i=0; + while ((flowFile = queue.poll(Collections.emptySet(), 0L)) != null) { + assertEquals(i++, flowFile.getSize()); + } + } @Test public void testSwapIn() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java index c93bf7ccd7..88470ff459 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java @@ -35,10 +35,8 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; public class StandaloneSwapFileIT extends FrameworkIntegrationTest { - @Test + @Test(timeout=60_000) public void testSwapOnRestart() throws ExecutionException, InterruptedException, IOException { - Thread.sleep(20000L); - final ProcessorNode generator = createProcessorNode(GenerateProcessor.class); generator.setProperties(Collections.singletonMap(GenerateProcessor.COUNT.getName(), "60000"));