diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index dd74250274..5dce801970 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -32,8 +32,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -88,19 +86,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private boolean swapMode = false; - // TODO: Need to create a single object that houses these 3 and then create an AtomicReference for it and use a CAS operation to set it. - private volatile String maximumQueueDataSize; - private volatile long maximumQueueByteCount; - private volatile long maximumQueueObjectCount; - - // TODO: Need to create a single object that houses these 2 and then create an AtomicReference for it and use CAS operation to set it. - private final AtomicLong flowFileExpirationMillis; - private final AtomicReference flowFileExpirationPeriod; - - // TODO: Need to eliminate this all together. Since we are not locking on the size, can just get the size and compare to max - private final AtomicBoolean queueFullRef = new AtomicBoolean(false); - - // TODO: Unit test better! + private final AtomicReference maxQueueSize = new AtomicReference<>(new MaxQueueSize("0 MB", 0L, 0L)); + private final AtomicReference expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L)); private final EventReporter eventReporter; private final Connection connection; @@ -124,11 +111,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) { activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList())); priorities = new ArrayList<>(); - maximumQueueObjectCount = 0L; - maximumQueueDataSize = "0 MB"; - maximumQueueByteCount = 0L; - flowFileExpirationMillis = new AtomicLong(0); - flowFileExpirationPeriod = new AtomicReference<>("0 mins"); swapQueue = new ArrayList<>(); this.eventReporter = eventReporter; this.swapManager = swapManager; @@ -170,36 +152,35 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } @Override - public void setBackPressureObjectThreshold(final long maxQueueSize) { - writeLock.lock(); - try { - maximumQueueObjectCount = maxQueueSize; - this.queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("setBackPressureObjectThreshold"); + public void setBackPressureObjectThreshold(final long threshold) { + boolean updated = false; + while (!updated) { + MaxQueueSize maxSize = maxQueueSize.get(); + final MaxQueueSize updatedSize = new MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold); + updated = maxQueueSize.compareAndSet(maxSize, updatedSize); } } @Override public long getBackPressureObjectThreshold() { - return maximumQueueObjectCount; + return maxQueueSize.get().getMaxCount(); } @Override public void setBackPressureDataSizeThreshold(final String maxDataSize) { - writeLock.lock(); - try { - maximumQueueByteCount = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue(); - maximumQueueDataSize = maxDataSize; - this.queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("setBackPressureDataSizeThreshold"); + final long maxBytes = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue(); + + boolean updated = false; + while (!updated) { + MaxQueueSize maxSize = maxQueueSize.get(); + final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, maxBytes, maxSize.getMaxCount()); + updated = maxQueueSize.compareAndSet(maxSize, updatedSize); } } @Override public String getBackPressureDataSizeThreshold() { - return maximumQueueDataSize; + return maxQueueSize.get().getMaxSize(); } @Override @@ -229,17 +210,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public void acknowledge(final FlowFileRecord flowFile) { - if (queueFullRef.get()) { - writeLock.lock(); - try { - incrementUnacknowledgedQueueSize(-1, -flowFile.getSize()); - queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("acknowledge(FlowFileRecord)"); - } - } else { - incrementUnacknowledgedQueueSize(-1, -flowFile.getSize()); - } + incrementUnacknowledgedQueueSize(-1, -flowFile.getSize()); if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { // queue was full but no longer is. Notify that the source may now be available to run, @@ -255,17 +226,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { totalSize += flowFile.getSize(); } - if (queueFullRef.get()) { - writeLock.lock(); - try { - incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize); - queueFullRef.set(determineIfFull()); - } finally { - writeLock.unlock("acknowledge(FlowFileRecord)"); - } - } else { - incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize); - } + incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize); if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { // it's possible that queue was full but no longer is. Notify that the source may now be available to run, @@ -276,33 +237,26 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public boolean isFull() { - return queueFullRef.get(); - } + final MaxQueueSize maxSize = maxQueueSize.get(); - /** - * MUST be called with either the read or write lock held - * - * @return true if full - */ - private boolean determineIfFull() { - final long maxSize = maximumQueueObjectCount; - final long maxBytes = maximumQueueByteCount; - if (maxSize <= 0 && maxBytes <= 0) { + // Check if max size is set + if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) { return false; } final QueueSize queueSize = getQueueSize(); - if (maxSize > 0 && queueSize.getObjectCount() >= maxSize) { + if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) { return true; } - if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) { + if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) { return true; } return false; } + @Override public void put(final FlowFileRecord file) { writeLock.lock(); @@ -316,8 +270,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { incrementActiveQueueSize(1, file.getSize()); activeQueue.add(file); } - - queueFullRef.set(determineIfFull()); } finally { writeLock.unlock("put(FlowFileRecord)"); } @@ -346,8 +298,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { incrementActiveQueueSize(numFiles, bytes); activeQueue.addAll(files); } - - queueFullRef.set(determineIfFull()); } finally { writeLock.unlock("putAll"); } @@ -383,7 +333,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { FlowFileRecord flowFile = null; // First check if we have any records Pre-Fetched. - final long expirationMillis = flowFileExpirationMillis.get(); + final long expirationMillis = expirationPeriod.get().getMillis(); writeLock.lock(); try { flowFile = doPoll(expiredRecords, expirationMillis); @@ -402,10 +352,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { boolean isExpired; migrateSwapToActive(); - final boolean queueFullAtStart = queueFullRef.get(); long expiredBytes = 0L; - do { flowFile = this.activeQueue.poll(); @@ -433,13 +381,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes); } - // if at least 1 FlowFile was expired & the queue was full before we started, then - // we need to determine whether or not the queue is full again. If no FlowFile was expired, - // then the queue will still be full until the appropriate #acknowledge method is called. - if (queueFullAtStart && !expiredRecords.isEmpty()) { - queueFullRef.set(determineIfFull()); - } - return flowFile; } @@ -460,8 +401,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private void doPoll(final List records, int maxResults, final Set expiredRecords) { migrateSwapToActive(); - final boolean queueFullAtStart = queueFullRef.get(); - final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords); long expiredBytes = 0L; @@ -471,13 +410,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained); incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes); - - // if at least 1 FlowFile was expired & the queue was full before we started, then - // we need to determine whether or not the queue is full again. If no FlowFile was expired, - // then the queue will still be full until the appropriate #acknowledge method is called. - if (queueFullAtStart && !expiredRecords.isEmpty()) { - queueFullRef.set(determineIfFull()); - } } /** @@ -669,7 +601,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { long drainedSize = 0L; FlowFileRecord pulled = null; - final long expirationMillis = this.flowFileExpirationMillis.get(); + final long expirationMillis = expirationPeriod.get().getMillis(); while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) { if (isLaterThan(getExpirationDate(pulled, expirationMillis))) { expiredRecords.add(pulled); @@ -692,14 +624,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue { public List poll(final FlowFileFilter filter, final Set expiredRecords) { long bytesPulled = 0L; int flowFilesPulled = 0; - boolean queueFullAtStart = false; writeLock.lock(); try { migrateSwapToActive(); - final long expirationMillis = this.flowFileExpirationMillis.get(); - queueFullAtStart = queueFullRef.get(); + final long expirationMillis = expirationPeriod.get().getMillis(); final List selectedFlowFiles = new ArrayList<>(); final List unselected = new ArrayList<>(); @@ -744,21 +674,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } this.activeQueue.addAll(unselected); + incrementActiveQueueSize(-flowFilesPulled, -bytesPulled); return selectedFlowFiles; } finally { - try { - incrementActiveQueueSize(-flowFilesPulled, -bytesPulled); - - // if at least 1 FlowFile was expired & the queue was full before we started, then - // we need to determine whether or not the queue is full again. If no FlowFile was expired, - // then the queue will still be full until the appropriate #acknowledge method is called. - if (queueFullAtStart && !expiredRecords.isEmpty()) { - queueFullRef.set(determineIfFull()); - } - } finally { - writeLock.unlock("poll(Filter, Set)"); - } + writeLock.unlock("poll(Filter, Set)"); } } @@ -830,12 +750,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public String getFlowFileExpiration() { - return flowFileExpirationPeriod.get(); + return expirationPeriod.get().getPeriod(); } @Override public int getFlowFileExpiration(final TimeUnit timeUnit) { - return (int) timeUnit.convert(flowFileExpirationMillis.get(), TimeUnit.MILLISECONDS); + return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS); } @Override @@ -844,8 +764,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { if (millis < 0) { throw new IllegalArgumentException("FlowFile Expiration Period must be positive"); } - this.flowFileExpirationPeriod.set(flowExpirationPeriod); - this.flowFileExpirationMillis.set(millis); + + expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis)); } @@ -1300,4 +1220,57 @@ public final class StandardFlowFileQueue implements FlowFileQueue { " Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]"; } } + + + private static class MaxQueueSize { + private final String maxSize; + private final long maxBytes; + private final long maxCount; + + public MaxQueueSize(final String maxSize, final long maxBytes, final long maxCount) { + this.maxSize = maxSize; + this.maxBytes = maxBytes; + this.maxCount = maxCount; + } + + public String getMaxSize() { + return maxSize; + } + + public long getMaxBytes() { + return maxBytes; + } + + public long getMaxCount() { + return maxCount; + } + + @Override + public String toString() { + return maxCount + " Objects/" + maxSize; + } + } + + private static class TimePeriod { + private final String period; + private final long millis; + + public TimePeriod(final String period, final long millis) { + this.period = period; + this.millis = millis; + } + + public String getPeriod() { + return period; + } + + public long getMillis() { + return millis; + } + + @Override + public String toString() { + return period; + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 61f96fd7a3..09ac7f2a27 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -50,18 +50,27 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestStandardFlowFileQueue { private TestSwapManager swapManager = null; private StandardFlowFileQueue queue = null; + private List provRecords = new ArrayList<>(); + @Before + @SuppressWarnings("unchecked") public void setup() { + provRecords.clear(); + final Connection connection = Mockito.mock(Connection.class); Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class)); Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class)); @@ -74,6 +83,16 @@ public class TestStandardFlowFileQueue { final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class); Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder()); + Mockito.doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + final Iterable iterable = (Iterable) invocation.getArguments()[0]; + for (final ProvenanceEventRecord record : iterable) { + provRecords.add(record); + } + return null; + } + }).when(provRepo).registerEvents(Mockito.any(Iterable.class)); queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000); TestFlowFile.idGenerator.set(0L); @@ -179,6 +198,39 @@ public class TestStandardFlowFileQueue { assertTrue(queue.isActiveQueueEmpty()); } + @Test(timeout = 10000) + public void testBackPressureAfterDrop() throws InterruptedException { + queue.setBackPressureObjectThreshold(10); + queue.setFlowFileExpiration("10 millis"); + + for (int i = 0; i < 9; i++) { + queue.put(new TestFlowFile()); + assertFalse(queue.isFull()); + } + + queue.put(new TestFlowFile()); + assertTrue(queue.isFull()); + + Thread.sleep(100L); + + final String requestId = UUID.randomUUID().toString(); + final DropFlowFileStatus status = queue.dropFlowFiles(requestId, "Unit Test"); + + while (status.getState() != DropFlowFileState.COMPLETE) { + Thread.sleep(10L); + } + + assertFalse(queue.isFull()); + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + + assertEquals(10, provRecords.size()); + for (final ProvenanceEventRecord event : provRecords) { + assertNotNull(event); + assertEquals(ProvenanceEventType.DROP, event.getEventType()); + } + } + @Test public void testBackPressureAfterPollSingle() throws InterruptedException { queue.setBackPressureObjectThreshold(10);