From 66d5ab80eb22f535da0898ae0d6a4a5da2dd7bd9 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 10 Jan 2020 11:27:57 -0500 Subject: [PATCH] NIFI-7011: This closes #3983. SwappablePriorityQueue contains two internal data structures: activeQueue, swapQueue. activeQueue is intended to be pulled from for processing. swapQueue is intended to hold FlowFiles that are waiting to be swapped out. SinWe want to ensure that we first swap in any data that has already been swapped out before processing the swap queue, in order to ensure that we process the data in the correct order. This fix ddresses an issue where data was being swapped out by writing the lowest priority data to a swap file, then adding the highest priority data to activeQueue and the 'middle' priority data back to swapQueue. As a result, when polling from the queue we got highest priority data, followed by lowest priority data, followed by middle priority data. This is addressed by avoiding putting anything back on swapQueue when we swap out. Instead, write data to the swap file, then push everything else to activeQueue. This way, any new data that comes in will still go to the swapQueue, as it should, but all data that didn't get written to the Swap file will be processed before the low priority data in the swap file. NIFI-7011: Addressed corner case where data could be inserted out of order still if added while swapping was taking place NIFI-7011: Fixed ordering issue with swap queue that can occur if data is migrated from swap queue to active queue instead of being swapped out --- .../queue/SwappablePriorityQueue.java | 54 +++-- .../repository/StandardProcessSession.java | 1 - .../controller/TestStandardFlowFileQueue.java | 22 -- .../clustered/TestSwappablePriorityQueue.java | 227 +++++++++++++++++- .../swap/StandaloneSwapFileIT.java | 4 +- 5 files changed, 255 insertions(+), 53 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java index b81bd3fc02..c44283841b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java @@ -162,6 +162,9 @@ public class SwappablePriorityQueue { } migrateSwapToActive(); + if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) { + return; + } final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE; @@ -171,10 +174,11 @@ public class SwappablePriorityQueue { originalSwapQueueBytes += flowFile.getSize(); } - // Create a new Priority queue with the prioritizers that are set, but reverse the - // prioritizers because we want to pull the lowest-priority FlowFiles to swap out - final PriorityQueue tempQueue = new PriorityQueue<>(activeQueue.size() + swapQueue.size(), Collections.reverseOrder(new QueuePrioritizer(getPriorities()))); - tempQueue.addAll(activeQueue); + // Create a new Priority queue with the same prioritizers that are set for this queue. We want to swap out the highest priority data first, because + // whatever data we don't write out to a swap file (because there isn't enough to fill a swap file) will be added back to the swap queue. + // Since the swap queue cannot be processed until all swap files, we want to ensure that only the lowest priority data goes back onto it. Which means + // that we must swap out the highest priority data that is currently on the swap queue. + final PriorityQueue tempQueue = new PriorityQueue<>(swapQueue.size(), new QueuePrioritizer(getPriorities())); tempQueue.addAll(swapQueue); long bytesSwappedOut = 0L; @@ -221,23 +225,14 @@ public class SwappablePriorityQueue { // swap queue. Then add the records back to the active queue. swapQueue.clear(); long updatedSwapQueueBytes = 0L; - while (tempQueue.size() > swapThreshold) { - final FlowFileRecord record = tempQueue.poll(); + FlowFileRecord record; + while ((record = tempQueue.poll()) != null) { swapQueue.add(record); updatedSwapQueueBytes += record.getSize(); } Collections.reverse(swapQueue); // currently ordered in reverse priority order based on the ordering of the temp queue - // replace the contents of the active queue, since we've merged it with the swap queue. - activeQueue.clear(); - FlowFileRecord toRequeue; - long activeQueueBytes = 0L; - while ((toRequeue = tempQueue.poll()) != null) { - activeQueue.offer(toRequeue); - activeQueueBytes += toRequeue.getSize(); - } - boolean updated = false; while (!updated) { final FlowFileQueueSize originalSize = getFlowFileQueueSize(); @@ -245,13 +240,13 @@ public class SwappablePriorityQueue { final int addedSwapRecords = swapQueue.size() - originalSwapQueueCount; final long addedSwapBytes = updatedSwapQueueBytes - originalSwapQueueBytes; - final FlowFileQueueSize newSize = new FlowFileQueueSize(activeQueue.size(), activeQueueBytes, + final FlowFileQueueSize newSize = new FlowFileQueueSize(originalSize.getActiveCount(), originalSize.getActiveBytes(), originalSize.getSwappedCount() + addedSwapRecords + flowFilesSwappedOut, originalSize.getSwappedBytes() + addedSwapBytes + bytesSwappedOut, originalSize.getSwapFileCount() + numSwapFiles, originalSize.getUnacknowledgedCount(), originalSize.getUnacknowledgedBytes()); - updated = updateSize(originalSize, newSize); + updated = updateSize(originalSize, newSize); if (updated) { logIfNegative(originalSize, newSize, "swap"); } @@ -286,9 +281,7 @@ public class SwappablePriorityQueue { // Calling this method when records are polled prevents this condition by migrating FlowFiles from the // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out // to disk, because we want them to be swapped back in in the same order that they were swapped out. - - final int activeQueueSize = activeQueue.size(); - if (activeQueueSize > 0 && activeQueueSize > swapThreshold - SWAP_RECORD_POLL_SIZE) { + if (!activeQueue.isEmpty()) { return; } @@ -315,20 +308,33 @@ public class SwappablePriorityQueue { return; } + // Swap Queue is not currently ordered. We want to migrate the highest priority FlowFiles to the Active Queue, then re-queue the lowest priority items. + final PriorityQueue tempQueue = new PriorityQueue<>(swapQueue.size(), new QueuePrioritizer(getPriorities())); + tempQueue.addAll(swapQueue); + int recordsMigrated = 0; long bytesMigrated = 0L; - final Iterator swapItr = swapQueue.iterator(); - while (activeQueue.size() < swapThreshold && swapItr.hasNext()) { - final FlowFileRecord toMigrate = swapItr.next(); + while (activeQueue.size() < swapThreshold) { + final FlowFileRecord toMigrate = tempQueue.poll(); + if (toMigrate == null) { + break; + } + activeQueue.add(toMigrate); bytesMigrated += toMigrate.getSize(); recordsMigrated++; - swapItr.remove(); + } + + swapQueue.clear(); + FlowFileRecord toRequeue; + while ((toRequeue = tempQueue.poll()) != null) { + swapQueue.add(toRequeue); } if (recordsMigrated > 0) { incrementActiveQueueSize(recordsMigrated, bytesMigrated); incrementSwapQueueSize(-recordsMigrated, -bytesMigrated, 0); + logger.debug("Migrated {} FlowFiles from swap queue to active queue for {}", recordsMigrated, this); } if (size.getSwappedCount() == 0) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index da7a6ee0f0..089ac90daa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -56,7 +56,6 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.stream.io.ByteCountingInputStream; import org.apache.nifi.stream.io.ByteCountingOutputStream; import org.apache.nifi.stream.io.StreamUtils; -import org.rocksdb.Checkpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index b454d2dece..4cb28a25cf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -314,28 +314,6 @@ public class TestStandardFlowFileQueue { assertEquals(10000, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount()); } - @Test - public void testLowestPrioritySwappedOutFirst() { - final List prioritizers = new ArrayList<>(); - prioritizers.add(new FlowFileSizePrioritizer()); - queue.setPriorities(prioritizers); - - long maxSize = 20000; - for (int i = 1; i <= 20000; i++) { - queue.put(new MockFlowFileRecord(maxSize - i)); - } - - assertEquals(1, swapManager.swapOutCalledCount); - assertEquals(20000, queue.size().getObjectCount()); - - assertEquals(10000, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount()); - final List flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet()); - assertEquals(10000, flowFiles.size()); - for (int i = 0; i < 10000; i++) { - assertEquals(i, flowFiles.get(i).getSize()); - } - } - @Test public void testSwapIn() { for (int i = 1; i <= 20000; i++) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java index ef1a06353d..79be6ed182 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java @@ -26,9 +26,11 @@ import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.queue.SwappablePriorityQueue; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.StringUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -78,6 +80,193 @@ public class TestSwappablePriorityQueue { queue = new SwappablePriorityQueue(swapManager, 10000, eventReporter, flowFileQueue, dropAction, "local"); } + @Test + public void testPrioritizersBigQueue() { + final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() { + @Override + public int compare(final FlowFile o1, final FlowFile o2) { + final int i1 = Integer.parseInt(o1.getAttribute("i")); + final int i2 = Integer.parseInt(o2.getAttribute("i")); + return Integer.compare(i1, i2); + } + }; + + queue.setPriorities(Collections.singletonList(iAttributePrioritizer)); + final int iterations = 29000; + + for (int i=0; i < iterations; i++) { + final MockFlowFile flowFile = new MockFlowFile(i); + flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i))); + queue.put(flowFile); + } + + for (int i=0; i < iterations; i++) { + final MockFlowFile flowFile = new MockFlowFile(i + iterations); + flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i + iterations))); + + final FlowFileRecord polled = queue.poll(Collections.emptySet(), 0L); + assertEquals(polled.getAttribute("i"), String.valueOf(i)); + + queue.put(flowFile); + } + + // Make sure that the data is pulled from the queue and added back a couple of times. + // This will trigger swapping to occur, but also leave a lot of data in memory on the queue. + // This specifically tests the edge case where data is swapped out, and we want to make sure that + // when we read from the queue, that we swap the data back in before processing anything on the + // pending 'swap queue' internally. + repopulateQueue(); + repopulateQueue(); + + int i=iterations; + FlowFileRecord flowFile; + while ((flowFile = queue.poll(Collections.emptySet(), 0)) != null) { + assertEquals(String.valueOf(i), flowFile.getAttribute("i")); + i++; + } + } + + + @Test + public void testOrderingWithCornerCases() { + final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() { + @Override + public int compare(final FlowFile o1, final FlowFile o2) { + final int i1 = Integer.parseInt(o1.getAttribute("i")); + final int i2 = Integer.parseInt(o2.getAttribute("i")); + return Integer.compare(i1, i2); + } + }; + + queue.setPriorities(Collections.singletonList(iAttributePrioritizer)); + + for (final int queueSize : new int[] {1, 9999, 10_000, 10_001, 19_999, 20_000, 20_001}) { + System.out.println("Queue Size: " + queueSize); + + for (int i=0; i < queueSize; i++) { + final MockFlowFile flowFile = new MockFlowFile(i); + flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i))); + queue.put(flowFile); + } + + for (int i=0; i < queueSize; i++) { + final FlowFileRecord flowFile = queue.poll(Collections.emptySet(), 0); + assertEquals(String.valueOf(i), flowFile.getAttribute("i")); + } + + assertNull(queue.poll(Collections.emptySet(), 0)); + } + } + + @Test + public void testPrioritizerWhenOutOfOrderDataEntersSwapQueue() { + final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() { + @Override + public int compare(final FlowFile o1, final FlowFile o2) { + final int i1 = Integer.parseInt(o1.getAttribute("i")); + final int i2 = Integer.parseInt(o2.getAttribute("i")); + return Integer.compare(i1, i2); + } + }; + + queue.setPriorities(Collections.singletonList(iAttributePrioritizer)); + + // Add 10,000 FlowFiles to the queue. These will all go to the active queue. + final int iterations = 10000; + for (int i=0; i < iterations; i++) { + final MockFlowFile flowFile = new MockFlowFile(i); + flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i))); + queue.put(flowFile); + } + + // Added 3 FlowFiles to the queue. These will all go to the Swap Queue. + for (final String iValue : new String[] {"10000", "-5", "8000"}) { + final MockFlowFile swapQueueFlowFile1 = new MockFlowFile(10_000); + swapQueueFlowFile1.putAttributes(Collections.singletonMap("i", iValue)); + queue.put(swapQueueFlowFile1); + } + + // The first 10,000 should be ordered. Then all FlowFiles on the swap queue should be transferred over, as a single unit, just as they would be in a swap file. + for (int i=0; i < iterations; i++) { + final FlowFileRecord flowFile = queue.poll(Collections.emptySet(), 0); + assertEquals(String.valueOf(i), flowFile.getAttribute("i")); + } + + for (final String iValue : new String[] {"-5", "8000", "10000"}) { + final FlowFileRecord flowFile = queue.poll(Collections.emptySet(), 0); + assertEquals(iValue, flowFile.getAttribute("i")); + } + } + + @Test + public void testPrioritizersDataAddedAfterSwapOccurs() { + final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() { + @Override + public int compare(final FlowFile o1, final FlowFile o2) { + final int i1 = Integer.parseInt(o1.getAttribute("i")); + final int i2 = Integer.parseInt(o2.getAttribute("i")); + return Integer.compare(i1, i2); + } + }; + + queue.setPriorities(Collections.singletonList(iAttributePrioritizer)); + final int iterations = 29000; + + for (int i=0; i < iterations; i++) { + final MockFlowFile flowFile = new MockFlowFile(i); + flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i))); + queue.put(flowFile); + } + + for (int i=0; i < iterations; i++) { + final MockFlowFile flowFile = new MockFlowFile(i + iterations); + flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i + iterations))); + + final FlowFileRecord polled = queue.poll(Collections.emptySet(), 0L); + assertEquals(polled.getAttribute("i"), String.valueOf(i)); + + queue.put(flowFile); + } + + // Make sure that the data is pulled from the queue and added back a couple of times. + // This will trigger swapping to occur, but also leave a lot of data in memory on the queue. + // This specifically tests the edge case where data is swapped out, and we want to make sure that + // when we read from the queue, that we swap the data back in before processing anything on the + // pending 'swap queue' internally. + repopulateQueue(); + repopulateQueue(); + + // Add enough data for another swap file to get created. + final int baseI = iterations * 2; + for (int i=0; i < 10_000; i++) { + final MockFlowFile flowFile = new MockFlowFile(i); + flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(baseI + i))); + queue.put(flowFile); + } + + repopulateQueue(); + + int i=iterations; + FlowFileRecord flowFile; + while ((flowFile = queue.poll(Collections.emptySet(), 0)) != null) { + assertEquals(String.valueOf(i), flowFile.getAttribute("i")); + i++; + } + } + + private void repopulateQueue() { + final List attrs = new ArrayList<>(); + final List ffs = new ArrayList<>(); + FlowFileRecord ff; + while ((ff = queue.poll(Collections.emptySet(), 0L)) != null) { + ffs.add(ff); + attrs.add(ff.getAttribute("i")); + } + + ffs.forEach(queue::put); + System.out.println(StringUtils.join(attrs, ", ")); + } + @Test public void testSwapOutFailureLeavesCorrectQueueSize() { @@ -241,13 +430,45 @@ public class TestSwappablePriorityQueue { assertEquals(20000, queue.size().getObjectCount()); assertEquals(10000, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount()); - final List flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet(), 500000); - assertEquals(10000, flowFiles.size()); + + // The first 10,000 FlowFiles to be added to the queue will be sorted by size (first 10,000 because that's the swap threshold, by size because of the prioritizer). + // The next 10,000 spill over to the swap queue. So we expect the first 10,000 FlowFiles to be size 10,000 to 20,000. Then the next 10,000 to be sized 0 to 9,999. + final List firstBatch = queue.poll(Integer.MAX_VALUE, Collections.emptySet(), 0); + assertEquals(10000, firstBatch.size()); for (int i = 0; i < 10000; i++) { - assertEquals(i, flowFiles.get(i).getSize()); + assertEquals(10_000 + i, firstBatch.get(i).getSize()); } + + final List secondBatch = queue.poll(Integer.MAX_VALUE, Collections.emptySet(), 0); + assertEquals(10000, secondBatch.size()); + for (int i = 0; i < 10000; i++) { + assertEquals(i, secondBatch.get(i).getSize()); + } + } + @Test + public void testPrioritiesKeptIntactBeforeSwap() { + final List prioritizers = new ArrayList<>(); + prioritizers.add((o1, o2) -> Long.compare(o1.getSize(), o2.getSize())); + queue.setPriorities(prioritizers); + + int maxSize = 9999; + for (int i = 1; i <= maxSize; i++) { + queue.put(new MockFlowFileRecord(maxSize - i)); + } + + assertEquals(0, swapManager.swapOutCalledCount); + assertEquals(maxSize, queue.size().getObjectCount()); + + assertEquals(9999, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount()); + + FlowFileRecord flowFile; + int i=0; + while ((flowFile = queue.poll(Collections.emptySet(), 0L)) != null) { + assertEquals(i++, flowFile.getSize()); + } + } @Test public void testSwapIn() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java index c93bf7ccd7..88470ff459 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/swap/StandaloneSwapFileIT.java @@ -35,10 +35,8 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; public class StandaloneSwapFileIT extends FrameworkIntegrationTest { - @Test + @Test(timeout=60_000) public void testSwapOnRestart() throws ExecutionException, InterruptedException, IOException { - Thread.sleep(20000L); - final ProcessorNode generator = createProcessorNode(GenerateProcessor.class); generator.setProperties(Collections.singletonMap(GenerateProcessor.COUNT.getName(), "60000"));