NIFI-1155: Ensure that when poll(FlowFileFilter, Set) is called, we properly update the indicator for whether or not queue is full

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2015-11-12 07:48:57 -05:00 committed by joewitt
parent e6086420aa
commit 3ed0949c55
2 changed files with 150 additions and 14 deletions

View File

@ -80,19 +80,30 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class);
private PriorityQueue<FlowFileRecord> activeQueue = null;
// guarded by lock
private ArrayList<FlowFileRecord> swapQueue = null;
private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L));
private boolean swapMode = false;
// TODO: Need to create a single object that houses these 3 and then create an AtomicReference for it and use a CAS operation to set it.
private volatile String maximumQueueDataSize;
private volatile long maximumQueueByteCount;
private volatile long maximumQueueObjectCount;
private final EventReporter eventReporter;
// TODO: Need to create a single object that houses these 2 and then create an AtomicReference for it and use CAS operation to set it.
private final AtomicLong flowFileExpirationMillis;
private final Connection connection;
private final AtomicReference<String> flowFileExpirationPeriod;
// TODO: Need to eliminate this all together. Since we are not locking on the size, can just get the size and compare to max
private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
// TODO: Unit test better!
private final EventReporter eventReporter;
private final Connection connection;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private final List<FlowFilePrioritizer> priorities;
private final int swapThreshold;
@ -106,8 +117,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private final ProvenanceEventRepository provRepository;
private final ResourceClaimManager resourceClaimManager;
private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
// SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK!
private final ProcessScheduler scheduler;
@ -683,13 +692,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
long bytesPulled = 0L;
int flowFilesPulled = 0;
boolean queueFullAtStart = false;
writeLock.lock();
try {
migrateSwapToActive();
final long expirationMillis = this.flowFileExpirationMillis.get();
final boolean queueFullAtStart = queueFullRef.get();
queueFullAtStart = queueFullRef.get();
final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
final List<FlowFileRecord> unselected = new ArrayList<>();
@ -735,17 +745,20 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
this.activeQueue.addAll(unselected);
// if at least 1 FlowFile was expired & the queue was full before we started, then
// we need to determine whether or not the queue is full again. If no FlowFile was expired,
// then the queue will still be full until the appropriate #acknowledge method is called.
if (queueFullAtStart && !expiredRecords.isEmpty()) {
queueFullRef.set(determineIfFull());
}
return selectedFlowFiles;
} finally {
incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
writeLock.unlock("poll(Filter, Set)");
try {
incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
// if at least 1 FlowFile was expired & the queue was full before we started, then
// we need to determine whether or not the queue is full again. If no FlowFile was expired,
// then the queue will still be full until the appropriate #acknowledge method is called.
if (queueFullAtStart && !expiredRecords.isEmpty()) {
queueFullRef.set(determineIfFull());
}
} finally {
writeLock.unlock("poll(Filter, Set)");
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.nifi.controller;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -48,6 +49,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.junit.Before;
@ -106,6 +108,127 @@ public class TestStandardFlowFileQueue {
assertEquals(0L, unackSize.getByteCount());
}
@Test
public void testBackPressure() {
queue.setBackPressureObjectThreshold(10);
assertTrue(queue.isEmpty());
assertTrue(queue.isActiveQueueEmpty());
assertFalse(queue.isFull());
for (int i = 0; i < 9; i++) {
queue.put(new TestFlowFile());
assertFalse(queue.isFull());
assertFalse(queue.isEmpty());
assertFalse(queue.isActiveQueueEmpty());
}
queue.put(new TestFlowFile());
assertTrue(queue.isFull());
assertFalse(queue.isEmpty());
assertFalse(queue.isActiveQueueEmpty());
final Set<FlowFileRecord> expiredRecords = new HashSet<>();
final FlowFileRecord polled = queue.poll(expiredRecords);
assertNotNull(polled);
assertTrue(expiredRecords.isEmpty());
assertFalse(queue.isEmpty());
assertFalse(queue.isActiveQueueEmpty());
// queue is still full because FlowFile has not yet been acknowledged.
assertTrue(queue.isFull());
queue.acknowledge(polled);
// FlowFile has been acknowledged; queue should no longer be full.
assertFalse(queue.isFull());
assertFalse(queue.isEmpty());
assertFalse(queue.isActiveQueueEmpty());
}
@Test
public void testBackPressureAfterPollFilter() throws InterruptedException {
queue.setBackPressureObjectThreshold(10);
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
queue.put(new TestFlowFile());
assertFalse(queue.isFull());
}
queue.put(new TestFlowFile());
assertTrue(queue.isFull());
Thread.sleep(100L);
final FlowFileFilter filter = new FlowFileFilter() {
@Override
public FlowFileFilterResult filter(final FlowFile flowFile) {
return FlowFileFilterResult.REJECT_AND_CONTINUE;
}
};
final Set<FlowFileRecord> expiredRecords = new HashSet<>();
final List<FlowFileRecord> polled = queue.poll(filter, expiredRecords);
assertTrue(polled.isEmpty());
assertEquals(10, expiredRecords.size());
assertFalse(queue.isFull());
assertTrue(queue.isEmpty());
assertTrue(queue.isActiveQueueEmpty());
}
@Test
public void testBackPressureAfterPollSingle() throws InterruptedException {
queue.setBackPressureObjectThreshold(10);
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
queue.put(new TestFlowFile());
assertFalse(queue.isFull());
}
queue.put(new TestFlowFile());
assertTrue(queue.isFull());
Thread.sleep(100L);
final Set<FlowFileRecord> expiredRecords = new HashSet<>();
final FlowFileRecord polled = queue.poll(expiredRecords);
assertNull(polled);
assertEquals(10, expiredRecords.size());
assertFalse(queue.isFull());
assertTrue(queue.isEmpty());
assertTrue(queue.isActiveQueueEmpty());
}
@Test
public void testBackPressureAfterPollMultiple() throws InterruptedException {
queue.setBackPressureObjectThreshold(10);
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
queue.put(new TestFlowFile());
assertFalse(queue.isFull());
}
queue.put(new TestFlowFile());
assertTrue(queue.isFull());
Thread.sleep(100L);
final Set<FlowFileRecord> expiredRecords = new HashSet<>();
final List<FlowFileRecord> polled = queue.poll(10, expiredRecords);
assertTrue(polled.isEmpty());
assertEquals(10, expiredRecords.size());
assertFalse(queue.isFull());
assertTrue(queue.isEmpty());
assertTrue(queue.isActiveQueueEmpty());
}
@Test
public void testSwapOutOccurs() {
for (int i = 0; i < 10000; i++) {