From 3ed0949c5578f379cab7b90ff32778bf6296404a Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 12 Nov 2015 07:48:57 -0500 Subject: [PATCH] NIFI-1155: Ensure that when poll(FlowFileFilter, Set) is called, we properly update the indicator for whether or not queue is full Signed-off-by: joewitt --- .../controller/StandardFlowFileQueue.java | 41 ++++-- .../controller/TestStandardFlowFileQueue.java | 123 ++++++++++++++++++ 2 files changed, 150 insertions(+), 14 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index ae991c806e..dd74250274 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -80,19 +80,30 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class); private PriorityQueue activeQueue = null; + + // guarded by lock private ArrayList swapQueue = null; private final AtomicReference size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L)); private boolean swapMode = false; + + // TODO: Need to create a single object that houses these 3 and then create an AtomicReference for it and use a CAS operation to set it. private volatile String maximumQueueDataSize; private volatile long maximumQueueByteCount; private volatile long maximumQueueObjectCount; - private final EventReporter eventReporter; + // TODO: Need to create a single object that houses these 2 and then create an AtomicReference for it and use CAS operation to set it. private final AtomicLong flowFileExpirationMillis; - private final Connection connection; private final AtomicReference flowFileExpirationPeriod; + + // TODO: Need to eliminate this all together. Since we are not locking on the size, can just get the size and compare to max + private final AtomicBoolean queueFullRef = new AtomicBoolean(false); + + // TODO: Unit test better! + + private final EventReporter eventReporter; + private final Connection connection; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); private final List priorities; private final int swapThreshold; @@ -106,8 +117,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final ProvenanceEventRepository provRepository; private final ResourceClaimManager resourceClaimManager; - private final AtomicBoolean queueFullRef = new AtomicBoolean(false); - // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK! private final ProcessScheduler scheduler; @@ -683,13 +692,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue { public List poll(final FlowFileFilter filter, final Set expiredRecords) { long bytesPulled = 0L; int flowFilesPulled = 0; + boolean queueFullAtStart = false; writeLock.lock(); try { migrateSwapToActive(); final long expirationMillis = this.flowFileExpirationMillis.get(); - final boolean queueFullAtStart = queueFullRef.get(); + queueFullAtStart = queueFullRef.get(); final List selectedFlowFiles = new ArrayList<>(); final List unselected = new ArrayList<>(); @@ -735,17 +745,20 @@ public final class StandardFlowFileQueue implements FlowFileQueue { this.activeQueue.addAll(unselected); - // if at least 1 FlowFile was expired & the queue was full before we started, then - // we need to determine whether or not the queue is full again. If no FlowFile was expired, - // then the queue will still be full until the appropriate #acknowledge method is called. - if (queueFullAtStart && !expiredRecords.isEmpty()) { - queueFullRef.set(determineIfFull()); - } - return selectedFlowFiles; } finally { - incrementActiveQueueSize(-flowFilesPulled, -bytesPulled); - writeLock.unlock("poll(Filter, Set)"); + try { + incrementActiveQueueSize(-flowFilesPulled, -bytesPulled); + + // if at least 1 FlowFile was expired & the queue was full before we started, then + // we need to determine whether or not the queue is full again. If no FlowFile was expired, + // then the queue will still be full until the appropriate #acknowledge method is called. + if (queueFullAtStart && !expiredRecords.isEmpty()) { + queueFullRef.set(determineIfFull()); + } + } finally { + writeLock.unlock("poll(Filter, Set)"); + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 8b8c678672..61f96fd7a3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -48,6 +49,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.junit.Before; @@ -106,6 +108,127 @@ public class TestStandardFlowFileQueue { assertEquals(0L, unackSize.getByteCount()); } + @Test + public void testBackPressure() { + queue.setBackPressureObjectThreshold(10); + + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + assertFalse(queue.isFull()); + + for (int i = 0; i < 9; i++) { + queue.put(new TestFlowFile()); + assertFalse(queue.isFull()); + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + } + + queue.put(new TestFlowFile()); + assertTrue(queue.isFull()); + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + + final Set expiredRecords = new HashSet<>(); + final FlowFileRecord polled = queue.poll(expiredRecords); + assertNotNull(polled); + assertTrue(expiredRecords.isEmpty()); + + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + + // queue is still full because FlowFile has not yet been acknowledged. + assertTrue(queue.isFull()); + queue.acknowledge(polled); + + // FlowFile has been acknowledged; queue should no longer be full. + assertFalse(queue.isFull()); + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + } + + @Test + public void testBackPressureAfterPollFilter() throws InterruptedException { + queue.setBackPressureObjectThreshold(10); + queue.setFlowFileExpiration("10 millis"); + + for (int i = 0; i < 9; i++) { + queue.put(new TestFlowFile()); + assertFalse(queue.isFull()); + } + + queue.put(new TestFlowFile()); + assertTrue(queue.isFull()); + + Thread.sleep(100L); + + + final FlowFileFilter filter = new FlowFileFilter() { + @Override + public FlowFileFilterResult filter(final FlowFile flowFile) { + return FlowFileFilterResult.REJECT_AND_CONTINUE; + } + }; + + final Set expiredRecords = new HashSet<>(); + final List polled = queue.poll(filter, expiredRecords); + assertTrue(polled.isEmpty()); + assertEquals(10, expiredRecords.size()); + + assertFalse(queue.isFull()); + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + } + + @Test + public void testBackPressureAfterPollSingle() throws InterruptedException { + queue.setBackPressureObjectThreshold(10); + queue.setFlowFileExpiration("10 millis"); + + for (int i = 0; i < 9; i++) { + queue.put(new TestFlowFile()); + assertFalse(queue.isFull()); + } + + queue.put(new TestFlowFile()); + assertTrue(queue.isFull()); + + Thread.sleep(100L); + + final Set expiredRecords = new HashSet<>(); + final FlowFileRecord polled = queue.poll(expiredRecords); + assertNull(polled); + assertEquals(10, expiredRecords.size()); + + assertFalse(queue.isFull()); + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + } + + @Test + public void testBackPressureAfterPollMultiple() throws InterruptedException { + queue.setBackPressureObjectThreshold(10); + queue.setFlowFileExpiration("10 millis"); + + for (int i = 0; i < 9; i++) { + queue.put(new TestFlowFile()); + assertFalse(queue.isFull()); + } + + queue.put(new TestFlowFile()); + assertTrue(queue.isFull()); + + Thread.sleep(100L); + + final Set expiredRecords = new HashSet<>(); + final List polled = queue.poll(10, expiredRecords); + assertTrue(polled.isEmpty()); + assertEquals(10, expiredRecords.size()); + + assertFalse(queue.isFull()); + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + } + @Test public void testSwapOutOccurs() { for (int i = 0; i < 10000; i++) {