mirror of https://github.com/apache/nifi.git
NIFI-7011: This closes #3983. SwappablePriorityQueue contains two internal data structures: activeQueue, swapQueue. activeQueue is intended to be pulled from for processing. swapQueue is intended to hold FlowFiles that are waiting to be swapped out. SinWe want to ensure that we first swap in any data that has already been swapped out before processing the swap queue, in order to ensure that we process the data in the correct order. This fix ddresses an issue where data was being swapped out by writing the lowest priority data to a swap file, then adding the highest priority data to activeQueue and the 'middle' priority data back to swapQueue. As a result, when polling from the queue we got highest priority data, followed by lowest priority data, followed by middle priority data. This is addressed by avoiding putting anything back on swapQueue when we swap out. Instead, write data to the swap file, then push everything else to activeQueue. This way, any new data that comes in will still go to the swapQueue, as it should, but all data that didn't get written to the Swap file will be processed before the low priority data in the swap file.
NIFI-7011: Addressed corner case where data could be inserted out of order still if added while swapping was taking place NIFI-7011: Fixed ordering issue with swap queue that can occur if data is migrated from swap queue to active queue instead of being swapped out
This commit is contained in:
parent
4e2b61efe4
commit
66d5ab80eb
|
@ -162,6 +162,9 @@ public class SwappablePriorityQueue {
|
|||
}
|
||||
|
||||
migrateSwapToActive();
|
||||
if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) {
|
||||
return;
|
||||
}
|
||||
|
||||
final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE;
|
||||
|
||||
|
@ -171,10 +174,11 @@ public class SwappablePriorityQueue {
|
|||
originalSwapQueueBytes += flowFile.getSize();
|
||||
}
|
||||
|
||||
// Create a new Priority queue with the prioritizers that are set, but reverse the
|
||||
// prioritizers because we want to pull the lowest-priority FlowFiles to swap out
|
||||
final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(activeQueue.size() + swapQueue.size(), Collections.reverseOrder(new QueuePrioritizer(getPriorities())));
|
||||
tempQueue.addAll(activeQueue);
|
||||
// Create a new Priority queue with the same prioritizers that are set for this queue. We want to swap out the highest priority data first, because
|
||||
// whatever data we don't write out to a swap file (because there isn't enough to fill a swap file) will be added back to the swap queue.
|
||||
// Since the swap queue cannot be processed until all swap files, we want to ensure that only the lowest priority data goes back onto it. Which means
|
||||
// that we must swap out the highest priority data that is currently on the swap queue.
|
||||
final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(swapQueue.size(), new QueuePrioritizer(getPriorities()));
|
||||
tempQueue.addAll(swapQueue);
|
||||
|
||||
long bytesSwappedOut = 0L;
|
||||
|
@ -221,23 +225,14 @@ public class SwappablePriorityQueue {
|
|||
// swap queue. Then add the records back to the active queue.
|
||||
swapQueue.clear();
|
||||
long updatedSwapQueueBytes = 0L;
|
||||
while (tempQueue.size() > swapThreshold) {
|
||||
final FlowFileRecord record = tempQueue.poll();
|
||||
FlowFileRecord record;
|
||||
while ((record = tempQueue.poll()) != null) {
|
||||
swapQueue.add(record);
|
||||
updatedSwapQueueBytes += record.getSize();
|
||||
}
|
||||
|
||||
Collections.reverse(swapQueue); // currently ordered in reverse priority order based on the ordering of the temp queue
|
||||
|
||||
// replace the contents of the active queue, since we've merged it with the swap queue.
|
||||
activeQueue.clear();
|
||||
FlowFileRecord toRequeue;
|
||||
long activeQueueBytes = 0L;
|
||||
while ((toRequeue = tempQueue.poll()) != null) {
|
||||
activeQueue.offer(toRequeue);
|
||||
activeQueueBytes += toRequeue.getSize();
|
||||
}
|
||||
|
||||
boolean updated = false;
|
||||
while (!updated) {
|
||||
final FlowFileQueueSize originalSize = getFlowFileQueueSize();
|
||||
|
@ -245,13 +240,13 @@ public class SwappablePriorityQueue {
|
|||
final int addedSwapRecords = swapQueue.size() - originalSwapQueueCount;
|
||||
final long addedSwapBytes = updatedSwapQueueBytes - originalSwapQueueBytes;
|
||||
|
||||
final FlowFileQueueSize newSize = new FlowFileQueueSize(activeQueue.size(), activeQueueBytes,
|
||||
final FlowFileQueueSize newSize = new FlowFileQueueSize(originalSize.getActiveCount(), originalSize.getActiveBytes(),
|
||||
originalSize.getSwappedCount() + addedSwapRecords + flowFilesSwappedOut,
|
||||
originalSize.getSwappedBytes() + addedSwapBytes + bytesSwappedOut,
|
||||
originalSize.getSwapFileCount() + numSwapFiles,
|
||||
originalSize.getUnacknowledgedCount(), originalSize.getUnacknowledgedBytes());
|
||||
updated = updateSize(originalSize, newSize);
|
||||
|
||||
updated = updateSize(originalSize, newSize);
|
||||
if (updated) {
|
||||
logIfNegative(originalSize, newSize, "swap");
|
||||
}
|
||||
|
@ -286,9 +281,7 @@ public class SwappablePriorityQueue {
|
|||
// Calling this method when records are polled prevents this condition by migrating FlowFiles from the
|
||||
// Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out
|
||||
// to disk, because we want them to be swapped back in in the same order that they were swapped out.
|
||||
|
||||
final int activeQueueSize = activeQueue.size();
|
||||
if (activeQueueSize > 0 && activeQueueSize > swapThreshold - SWAP_RECORD_POLL_SIZE) {
|
||||
if (!activeQueue.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -315,20 +308,33 @@ public class SwappablePriorityQueue {
|
|||
return;
|
||||
}
|
||||
|
||||
// Swap Queue is not currently ordered. We want to migrate the highest priority FlowFiles to the Active Queue, then re-queue the lowest priority items.
|
||||
final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(swapQueue.size(), new QueuePrioritizer(getPriorities()));
|
||||
tempQueue.addAll(swapQueue);
|
||||
|
||||
int recordsMigrated = 0;
|
||||
long bytesMigrated = 0L;
|
||||
final Iterator<FlowFileRecord> swapItr = swapQueue.iterator();
|
||||
while (activeQueue.size() < swapThreshold && swapItr.hasNext()) {
|
||||
final FlowFileRecord toMigrate = swapItr.next();
|
||||
while (activeQueue.size() < swapThreshold) {
|
||||
final FlowFileRecord toMigrate = tempQueue.poll();
|
||||
if (toMigrate == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
activeQueue.add(toMigrate);
|
||||
bytesMigrated += toMigrate.getSize();
|
||||
recordsMigrated++;
|
||||
swapItr.remove();
|
||||
}
|
||||
|
||||
swapQueue.clear();
|
||||
FlowFileRecord toRequeue;
|
||||
while ((toRequeue = tempQueue.poll()) != null) {
|
||||
swapQueue.add(toRequeue);
|
||||
}
|
||||
|
||||
if (recordsMigrated > 0) {
|
||||
incrementActiveQueueSize(recordsMigrated, bytesMigrated);
|
||||
incrementSwapQueueSize(-recordsMigrated, -bytesMigrated, 0);
|
||||
logger.debug("Migrated {} FlowFiles from swap queue to active queue for {}", recordsMigrated, this);
|
||||
}
|
||||
|
||||
if (size.getSwappedCount() == 0) {
|
||||
|
|
|
@ -56,7 +56,6 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord;
|
|||
import org.apache.nifi.stream.io.ByteCountingInputStream;
|
||||
import org.apache.nifi.stream.io.ByteCountingOutputStream;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
import org.rocksdb.Checkpoint;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
|
@ -314,28 +314,6 @@ public class TestStandardFlowFileQueue {
|
|||
assertEquals(10000, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLowestPrioritySwappedOutFirst() {
|
||||
final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
|
||||
prioritizers.add(new FlowFileSizePrioritizer());
|
||||
queue.setPriorities(prioritizers);
|
||||
|
||||
long maxSize = 20000;
|
||||
for (int i = 1; i <= 20000; i++) {
|
||||
queue.put(new MockFlowFileRecord(maxSize - i));
|
||||
}
|
||||
|
||||
assertEquals(1, swapManager.swapOutCalledCount);
|
||||
assertEquals(20000, queue.size().getObjectCount());
|
||||
|
||||
assertEquals(10000, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
|
||||
final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet<FlowFileRecord>());
|
||||
assertEquals(10000, flowFiles.size());
|
||||
for (int i = 0; i < 10000; i++) {
|
||||
assertEquals(i, flowFiles.get(i).getSize());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSwapIn() {
|
||||
for (int i = 1; i <= 20000; i++) {
|
||||
|
|
|
@ -26,9 +26,11 @@ import org.apache.nifi.controller.queue.QueueSize;
|
|||
import org.apache.nifi.controller.queue.SwappablePriorityQueue;
|
||||
import org.apache.nifi.controller.repository.FlowFileRecord;
|
||||
import org.apache.nifi.events.EventReporter;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.flowfile.FlowFilePrioritizer;
|
||||
import org.apache.nifi.reporting.Severity;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -78,6 +80,193 @@ public class TestSwappablePriorityQueue {
|
|||
queue = new SwappablePriorityQueue(swapManager, 10000, eventReporter, flowFileQueue, dropAction, "local");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrioritizersBigQueue() {
|
||||
final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() {
|
||||
@Override
|
||||
public int compare(final FlowFile o1, final FlowFile o2) {
|
||||
final int i1 = Integer.parseInt(o1.getAttribute("i"));
|
||||
final int i2 = Integer.parseInt(o2.getAttribute("i"));
|
||||
return Integer.compare(i1, i2);
|
||||
}
|
||||
};
|
||||
|
||||
queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
|
||||
final int iterations = 29000;
|
||||
|
||||
for (int i=0; i < iterations; i++) {
|
||||
final MockFlowFile flowFile = new MockFlowFile(i);
|
||||
flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i)));
|
||||
queue.put(flowFile);
|
||||
}
|
||||
|
||||
for (int i=0; i < iterations; i++) {
|
||||
final MockFlowFile flowFile = new MockFlowFile(i + iterations);
|
||||
flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i + iterations)));
|
||||
|
||||
final FlowFileRecord polled = queue.poll(Collections.emptySet(), 0L);
|
||||
assertEquals(polled.getAttribute("i"), String.valueOf(i));
|
||||
|
||||
queue.put(flowFile);
|
||||
}
|
||||
|
||||
// Make sure that the data is pulled from the queue and added back a couple of times.
|
||||
// This will trigger swapping to occur, but also leave a lot of data in memory on the queue.
|
||||
// This specifically tests the edge case where data is swapped out, and we want to make sure that
|
||||
// when we read from the queue, that we swap the data back in before processing anything on the
|
||||
// pending 'swap queue' internally.
|
||||
repopulateQueue();
|
||||
repopulateQueue();
|
||||
|
||||
int i=iterations;
|
||||
FlowFileRecord flowFile;
|
||||
while ((flowFile = queue.poll(Collections.emptySet(), 0)) != null) {
|
||||
assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testOrderingWithCornerCases() {
|
||||
final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() {
|
||||
@Override
|
||||
public int compare(final FlowFile o1, final FlowFile o2) {
|
||||
final int i1 = Integer.parseInt(o1.getAttribute("i"));
|
||||
final int i2 = Integer.parseInt(o2.getAttribute("i"));
|
||||
return Integer.compare(i1, i2);
|
||||
}
|
||||
};
|
||||
|
||||
queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
|
||||
|
||||
for (final int queueSize : new int[] {1, 9999, 10_000, 10_001, 19_999, 20_000, 20_001}) {
|
||||
System.out.println("Queue Size: " + queueSize);
|
||||
|
||||
for (int i=0; i < queueSize; i++) {
|
||||
final MockFlowFile flowFile = new MockFlowFile(i);
|
||||
flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i)));
|
||||
queue.put(flowFile);
|
||||
}
|
||||
|
||||
for (int i=0; i < queueSize; i++) {
|
||||
final FlowFileRecord flowFile = queue.poll(Collections.emptySet(), 0);
|
||||
assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
|
||||
}
|
||||
|
||||
assertNull(queue.poll(Collections.emptySet(), 0));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrioritizerWhenOutOfOrderDataEntersSwapQueue() {
|
||||
final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() {
|
||||
@Override
|
||||
public int compare(final FlowFile o1, final FlowFile o2) {
|
||||
final int i1 = Integer.parseInt(o1.getAttribute("i"));
|
||||
final int i2 = Integer.parseInt(o2.getAttribute("i"));
|
||||
return Integer.compare(i1, i2);
|
||||
}
|
||||
};
|
||||
|
||||
queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
|
||||
|
||||
// Add 10,000 FlowFiles to the queue. These will all go to the active queue.
|
||||
final int iterations = 10000;
|
||||
for (int i=0; i < iterations; i++) {
|
||||
final MockFlowFile flowFile = new MockFlowFile(i);
|
||||
flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i)));
|
||||
queue.put(flowFile);
|
||||
}
|
||||
|
||||
// Added 3 FlowFiles to the queue. These will all go to the Swap Queue.
|
||||
for (final String iValue : new String[] {"10000", "-5", "8000"}) {
|
||||
final MockFlowFile swapQueueFlowFile1 = new MockFlowFile(10_000);
|
||||
swapQueueFlowFile1.putAttributes(Collections.singletonMap("i", iValue));
|
||||
queue.put(swapQueueFlowFile1);
|
||||
}
|
||||
|
||||
// The first 10,000 should be ordered. Then all FlowFiles on the swap queue should be transferred over, as a single unit, just as they would be in a swap file.
|
||||
for (int i=0; i < iterations; i++) {
|
||||
final FlowFileRecord flowFile = queue.poll(Collections.emptySet(), 0);
|
||||
assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
|
||||
}
|
||||
|
||||
for (final String iValue : new String[] {"-5", "8000", "10000"}) {
|
||||
final FlowFileRecord flowFile = queue.poll(Collections.emptySet(), 0);
|
||||
assertEquals(iValue, flowFile.getAttribute("i"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrioritizersDataAddedAfterSwapOccurs() {
|
||||
final FlowFilePrioritizer iAttributePrioritizer = new FlowFilePrioritizer() {
|
||||
@Override
|
||||
public int compare(final FlowFile o1, final FlowFile o2) {
|
||||
final int i1 = Integer.parseInt(o1.getAttribute("i"));
|
||||
final int i2 = Integer.parseInt(o2.getAttribute("i"));
|
||||
return Integer.compare(i1, i2);
|
||||
}
|
||||
};
|
||||
|
||||
queue.setPriorities(Collections.singletonList(iAttributePrioritizer));
|
||||
final int iterations = 29000;
|
||||
|
||||
for (int i=0; i < iterations; i++) {
|
||||
final MockFlowFile flowFile = new MockFlowFile(i);
|
||||
flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i)));
|
||||
queue.put(flowFile);
|
||||
}
|
||||
|
||||
for (int i=0; i < iterations; i++) {
|
||||
final MockFlowFile flowFile = new MockFlowFile(i + iterations);
|
||||
flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(i + iterations)));
|
||||
|
||||
final FlowFileRecord polled = queue.poll(Collections.emptySet(), 0L);
|
||||
assertEquals(polled.getAttribute("i"), String.valueOf(i));
|
||||
|
||||
queue.put(flowFile);
|
||||
}
|
||||
|
||||
// Make sure that the data is pulled from the queue and added back a couple of times.
|
||||
// This will trigger swapping to occur, but also leave a lot of data in memory on the queue.
|
||||
// This specifically tests the edge case where data is swapped out, and we want to make sure that
|
||||
// when we read from the queue, that we swap the data back in before processing anything on the
|
||||
// pending 'swap queue' internally.
|
||||
repopulateQueue();
|
||||
repopulateQueue();
|
||||
|
||||
// Add enough data for another swap file to get created.
|
||||
final int baseI = iterations * 2;
|
||||
for (int i=0; i < 10_000; i++) {
|
||||
final MockFlowFile flowFile = new MockFlowFile(i);
|
||||
flowFile.putAttributes(Collections.singletonMap("i", String.valueOf(baseI + i)));
|
||||
queue.put(flowFile);
|
||||
}
|
||||
|
||||
repopulateQueue();
|
||||
|
||||
int i=iterations;
|
||||
FlowFileRecord flowFile;
|
||||
while ((flowFile = queue.poll(Collections.emptySet(), 0)) != null) {
|
||||
assertEquals(String.valueOf(i), flowFile.getAttribute("i"));
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
private void repopulateQueue() {
|
||||
final List<String> attrs = new ArrayList<>();
|
||||
final List<FlowFileRecord> ffs = new ArrayList<>();
|
||||
FlowFileRecord ff;
|
||||
while ((ff = queue.poll(Collections.emptySet(), 0L)) != null) {
|
||||
ffs.add(ff);
|
||||
attrs.add(ff.getAttribute("i"));
|
||||
}
|
||||
|
||||
ffs.forEach(queue::put);
|
||||
System.out.println(StringUtils.join(attrs, ", "));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSwapOutFailureLeavesCorrectQueueSize() {
|
||||
|
@ -241,13 +430,45 @@ public class TestSwappablePriorityQueue {
|
|||
assertEquals(20000, queue.size().getObjectCount());
|
||||
|
||||
assertEquals(10000, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount());
|
||||
final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet<FlowFileRecord>(), 500000);
|
||||
assertEquals(10000, flowFiles.size());
|
||||
|
||||
// The first 10,000 FlowFiles to be added to the queue will be sorted by size (first 10,000 because that's the swap threshold, by size because of the prioritizer).
|
||||
// The next 10,000 spill over to the swap queue. So we expect the first 10,000 FlowFiles to be size 10,000 to 20,000. Then the next 10,000 to be sized 0 to 9,999.
|
||||
final List<FlowFileRecord> firstBatch = queue.poll(Integer.MAX_VALUE, Collections.emptySet(), 0);
|
||||
assertEquals(10000, firstBatch.size());
|
||||
for (int i = 0; i < 10000; i++) {
|
||||
assertEquals(i, flowFiles.get(i).getSize());
|
||||
assertEquals(10_000 + i, firstBatch.get(i).getSize());
|
||||
}
|
||||
|
||||
final List<FlowFileRecord> secondBatch = queue.poll(Integer.MAX_VALUE, Collections.emptySet(), 0);
|
||||
assertEquals(10000, secondBatch.size());
|
||||
for (int i = 0; i < 10000; i++) {
|
||||
assertEquals(i, secondBatch.get(i).getSize());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrioritiesKeptIntactBeforeSwap() {
|
||||
final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
|
||||
prioritizers.add((o1, o2) -> Long.compare(o1.getSize(), o2.getSize()));
|
||||
queue.setPriorities(prioritizers);
|
||||
|
||||
int maxSize = 9999;
|
||||
for (int i = 1; i <= maxSize; i++) {
|
||||
queue.put(new MockFlowFileRecord(maxSize - i));
|
||||
}
|
||||
|
||||
assertEquals(0, swapManager.swapOutCalledCount);
|
||||
assertEquals(maxSize, queue.size().getObjectCount());
|
||||
|
||||
assertEquals(9999, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount());
|
||||
|
||||
FlowFileRecord flowFile;
|
||||
int i=0;
|
||||
while ((flowFile = queue.poll(Collections.emptySet(), 0L)) != null) {
|
||||
assertEquals(i++, flowFile.getSize());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSwapIn() {
|
||||
|
|
|
@ -35,10 +35,8 @@ import static org.testng.Assert.assertNotNull;
|
|||
import static org.testng.Assert.assertNull;
|
||||
|
||||
public class StandaloneSwapFileIT extends FrameworkIntegrationTest {
|
||||
@Test
|
||||
@Test(timeout=60_000)
|
||||
public void testSwapOnRestart() throws ExecutionException, InterruptedException, IOException {
|
||||
Thread.sleep(20000L);
|
||||
|
||||
final ProcessorNode generator = createProcessorNode(GenerateProcessor.class);
|
||||
generator.setProperties(Collections.singletonMap(GenerateProcessor.COUNT.getName(), "60000"));
|
||||
|
||||
|
|
Loading…
Reference in New Issue