NIFI-1155: Refactored StandardFlowFileQueue to update member variables more intelligently, using CAS operations instead of locks. This reduces code complexities because other optimizations that previously existed are no longer needed

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2015-11-12 08:36:38 -05:00 committed by joewitt
parent 3ed0949c55
commit 37d6b7350e
2 changed files with 139 additions and 114 deletions

View File

@ -32,8 +32,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -88,19 +86,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private boolean swapMode = false; private boolean swapMode = false;
// TODO: Need to create a single object that houses these 3 and then create an AtomicReference for it and use a CAS operation to set it. private final AtomicReference<MaxQueueSize> maxQueueSize = new AtomicReference<>(new MaxQueueSize("0 MB", 0L, 0L));
private volatile String maximumQueueDataSize; private final AtomicReference<TimePeriod> expirationPeriod = new AtomicReference<>(new TimePeriod("0 mins", 0L));
private volatile long maximumQueueByteCount;
private volatile long maximumQueueObjectCount;
// TODO: Need to create a single object that houses these 2 and then create an AtomicReference for it and use CAS operation to set it.
private final AtomicLong flowFileExpirationMillis;
private final AtomicReference<String> flowFileExpirationPeriod;
// TODO: Need to eliminate this all together. Since we are not locking on the size, can just get the size and compare to max
private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
// TODO: Unit test better!
private final EventReporter eventReporter; private final EventReporter eventReporter;
private final Connection connection; private final Connection connection;
@ -124,11 +111,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) { final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) {
activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>())); activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
priorities = new ArrayList<>(); priorities = new ArrayList<>();
maximumQueueObjectCount = 0L;
maximumQueueDataSize = "0 MB";
maximumQueueByteCount = 0L;
flowFileExpirationMillis = new AtomicLong(0);
flowFileExpirationPeriod = new AtomicReference<>("0 mins");
swapQueue = new ArrayList<>(); swapQueue = new ArrayList<>();
this.eventReporter = eventReporter; this.eventReporter = eventReporter;
this.swapManager = swapManager; this.swapManager = swapManager;
@ -170,36 +152,35 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} }
@Override @Override
public void setBackPressureObjectThreshold(final long maxQueueSize) { public void setBackPressureObjectThreshold(final long threshold) {
writeLock.lock(); boolean updated = false;
try { while (!updated) {
maximumQueueObjectCount = maxQueueSize; MaxQueueSize maxSize = maxQueueSize.get();
this.queueFullRef.set(determineIfFull()); final MaxQueueSize updatedSize = new MaxQueueSize(maxSize.getMaxSize(), maxSize.getMaxBytes(), threshold);
} finally { updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
writeLock.unlock("setBackPressureObjectThreshold");
} }
} }
@Override @Override
public long getBackPressureObjectThreshold() { public long getBackPressureObjectThreshold() {
return maximumQueueObjectCount; return maxQueueSize.get().getMaxCount();
} }
@Override @Override
public void setBackPressureDataSizeThreshold(final String maxDataSize) { public void setBackPressureDataSizeThreshold(final String maxDataSize) {
writeLock.lock(); final long maxBytes = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue();
try {
maximumQueueByteCount = DataUnit.parseDataSize(maxDataSize, DataUnit.B).longValue(); boolean updated = false;
maximumQueueDataSize = maxDataSize; while (!updated) {
this.queueFullRef.set(determineIfFull()); MaxQueueSize maxSize = maxQueueSize.get();
} finally { final MaxQueueSize updatedSize = new MaxQueueSize(maxDataSize, maxBytes, maxSize.getMaxCount());
writeLock.unlock("setBackPressureDataSizeThreshold"); updated = maxQueueSize.compareAndSet(maxSize, updatedSize);
} }
} }
@Override @Override
public String getBackPressureDataSizeThreshold() { public String getBackPressureDataSizeThreshold() {
return maximumQueueDataSize; return maxQueueSize.get().getMaxSize();
} }
@Override @Override
@ -229,17 +210,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override @Override
public void acknowledge(final FlowFileRecord flowFile) { public void acknowledge(final FlowFileRecord flowFile) {
if (queueFullRef.get()) { incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
writeLock.lock();
try {
incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
queueFullRef.set(determineIfFull());
} finally {
writeLock.unlock("acknowledge(FlowFileRecord)");
}
} else {
incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
}
if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
// queue was full but no longer is. Notify that the source may now be available to run, // queue was full but no longer is. Notify that the source may now be available to run,
@ -255,17 +226,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
totalSize += flowFile.getSize(); totalSize += flowFile.getSize();
} }
if (queueFullRef.get()) { incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
writeLock.lock();
try {
incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
queueFullRef.set(determineIfFull());
} finally {
writeLock.unlock("acknowledge(FlowFileRecord)");
}
} else {
incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
}
if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
// it's possible that queue was full but no longer is. Notify that the source may now be available to run, // it's possible that queue was full but no longer is. Notify that the source may now be available to run,
@ -276,33 +237,26 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override @Override
public boolean isFull() { public boolean isFull() {
return queueFullRef.get(); final MaxQueueSize maxSize = maxQueueSize.get();
}
/** // Check if max size is set
* MUST be called with either the read or write lock held if (maxSize.getMaxBytes() <= 0 && maxSize.getMaxCount() <= 0) {
*
* @return true if full
*/
private boolean determineIfFull() {
final long maxSize = maximumQueueObjectCount;
final long maxBytes = maximumQueueByteCount;
if (maxSize <= 0 && maxBytes <= 0) {
return false; return false;
} }
final QueueSize queueSize = getQueueSize(); final QueueSize queueSize = getQueueSize();
if (maxSize > 0 && queueSize.getObjectCount() >= maxSize) { if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= maxSize.getMaxCount()) {
return true; return true;
} }
if (maxBytes > 0 && queueSize.getByteCount() >= maxBytes) { if (maxSize.getMaxBytes() > 0 && queueSize.getByteCount() >= maxSize.getMaxBytes()) {
return true; return true;
} }
return false; return false;
} }
@Override @Override
public void put(final FlowFileRecord file) { public void put(final FlowFileRecord file) {
writeLock.lock(); writeLock.lock();
@ -316,8 +270,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(1, file.getSize()); incrementActiveQueueSize(1, file.getSize());
activeQueue.add(file); activeQueue.add(file);
} }
queueFullRef.set(determineIfFull());
} finally { } finally {
writeLock.unlock("put(FlowFileRecord)"); writeLock.unlock("put(FlowFileRecord)");
} }
@ -346,8 +298,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(numFiles, bytes); incrementActiveQueueSize(numFiles, bytes);
activeQueue.addAll(files); activeQueue.addAll(files);
} }
queueFullRef.set(determineIfFull());
} finally { } finally {
writeLock.unlock("putAll"); writeLock.unlock("putAll");
} }
@ -383,7 +333,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
FlowFileRecord flowFile = null; FlowFileRecord flowFile = null;
// First check if we have any records Pre-Fetched. // First check if we have any records Pre-Fetched.
final long expirationMillis = flowFileExpirationMillis.get(); final long expirationMillis = expirationPeriod.get().getMillis();
writeLock.lock(); writeLock.lock();
try { try {
flowFile = doPoll(expiredRecords, expirationMillis); flowFile = doPoll(expiredRecords, expirationMillis);
@ -402,10 +352,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
boolean isExpired; boolean isExpired;
migrateSwapToActive(); migrateSwapToActive();
final boolean queueFullAtStart = queueFullRef.get();
long expiredBytes = 0L; long expiredBytes = 0L;
do { do {
flowFile = this.activeQueue.poll(); flowFile = this.activeQueue.poll();
@ -433,13 +381,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes); incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes);
} }
// if at least 1 FlowFile was expired & the queue was full before we started, then
// we need to determine whether or not the queue is full again. If no FlowFile was expired,
// then the queue will still be full until the appropriate #acknowledge method is called.
if (queueFullAtStart && !expiredRecords.isEmpty()) {
queueFullRef.set(determineIfFull());
}
return flowFile; return flowFile;
} }
@ -460,8 +401,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords) { private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords) {
migrateSwapToActive(); migrateSwapToActive();
final boolean queueFullAtStart = queueFullRef.get();
final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords); final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords);
long expiredBytes = 0L; long expiredBytes = 0L;
@ -471,13 +410,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained); incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained);
incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes); incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes);
// if at least 1 FlowFile was expired & the queue was full before we started, then
// we need to determine whether or not the queue is full again. If no FlowFile was expired,
// then the queue will still be full until the appropriate #acknowledge method is called.
if (queueFullAtStart && !expiredRecords.isEmpty()) {
queueFullRef.set(determineIfFull());
}
} }
/** /**
@ -669,7 +601,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
long drainedSize = 0L; long drainedSize = 0L;
FlowFileRecord pulled = null; FlowFileRecord pulled = null;
final long expirationMillis = this.flowFileExpirationMillis.get(); final long expirationMillis = expirationPeriod.get().getMillis();
while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) { while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) {
if (isLaterThan(getExpirationDate(pulled, expirationMillis))) { if (isLaterThan(getExpirationDate(pulled, expirationMillis))) {
expiredRecords.add(pulled); expiredRecords.add(pulled);
@ -692,14 +624,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) { public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
long bytesPulled = 0L; long bytesPulled = 0L;
int flowFilesPulled = 0; int flowFilesPulled = 0;
boolean queueFullAtStart = false;
writeLock.lock(); writeLock.lock();
try { try {
migrateSwapToActive(); migrateSwapToActive();
final long expirationMillis = this.flowFileExpirationMillis.get(); final long expirationMillis = expirationPeriod.get().getMillis();
queueFullAtStart = queueFullRef.get();
final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>(); final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>();
final List<FlowFileRecord> unselected = new ArrayList<>(); final List<FlowFileRecord> unselected = new ArrayList<>();
@ -744,21 +674,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} }
this.activeQueue.addAll(unselected); this.activeQueue.addAll(unselected);
incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
return selectedFlowFiles; return selectedFlowFiles;
} finally { } finally {
try { writeLock.unlock("poll(Filter, Set)");
incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
// if at least 1 FlowFile was expired & the queue was full before we started, then
// we need to determine whether or not the queue is full again. If no FlowFile was expired,
// then the queue will still be full until the appropriate #acknowledge method is called.
if (queueFullAtStart && !expiredRecords.isEmpty()) {
queueFullRef.set(determineIfFull());
}
} finally {
writeLock.unlock("poll(Filter, Set)");
}
} }
} }
@ -830,12 +750,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override @Override
public String getFlowFileExpiration() { public String getFlowFileExpiration() {
return flowFileExpirationPeriod.get(); return expirationPeriod.get().getPeriod();
} }
@Override @Override
public int getFlowFileExpiration(final TimeUnit timeUnit) { public int getFlowFileExpiration(final TimeUnit timeUnit) {
return (int) timeUnit.convert(flowFileExpirationMillis.get(), TimeUnit.MILLISECONDS); return (int) timeUnit.convert(expirationPeriod.get().getMillis(), TimeUnit.MILLISECONDS);
} }
@Override @Override
@ -844,8 +764,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
if (millis < 0) { if (millis < 0) {
throw new IllegalArgumentException("FlowFile Expiration Period must be positive"); throw new IllegalArgumentException("FlowFile Expiration Period must be positive");
} }
this.flowFileExpirationPeriod.set(flowExpirationPeriod);
this.flowFileExpirationMillis.set(millis); expirationPeriod.set(new TimePeriod(flowExpirationPeriod, millis));
} }
@ -1300,4 +1220,57 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
" Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]"; " Bytes], Unacknowledged=[" + unacknowledgedCount + ", " + unacknowledgedBytes + " Bytes] ]";
} }
} }
private static class MaxQueueSize {
private final String maxSize;
private final long maxBytes;
private final long maxCount;
public MaxQueueSize(final String maxSize, final long maxBytes, final long maxCount) {
this.maxSize = maxSize;
this.maxBytes = maxBytes;
this.maxCount = maxCount;
}
public String getMaxSize() {
return maxSize;
}
public long getMaxBytes() {
return maxBytes;
}
public long getMaxCount() {
return maxCount;
}
@Override
public String toString() {
return maxCount + " Objects/" + maxSize;
}
}
private static class TimePeriod {
private final String period;
private final long millis;
public TimePeriod(final String period, final long millis) {
this.period = period;
this.millis = millis;
}
public String getPeriod() {
return period;
}
public long getMillis() {
return millis;
}
@Override
public String toString() {
return period;
}
}
} }

View File

@ -50,18 +50,27 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestStandardFlowFileQueue { public class TestStandardFlowFileQueue {
private TestSwapManager swapManager = null; private TestSwapManager swapManager = null;
private StandardFlowFileQueue queue = null; private StandardFlowFileQueue queue = null;
private List<ProvenanceEventRecord> provRecords = new ArrayList<>();
@Before @Before
@SuppressWarnings("unchecked")
public void setup() { public void setup() {
provRecords.clear();
final Connection connection = Mockito.mock(Connection.class); final Connection connection = Mockito.mock(Connection.class);
Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class)); Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class));
Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class)); Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
@ -74,6 +83,16 @@ public class TestStandardFlowFileQueue {
final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class); final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder()); Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder());
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
final Iterable<ProvenanceEventRecord> iterable = (Iterable<ProvenanceEventRecord>) invocation.getArguments()[0];
for (final ProvenanceEventRecord record : iterable) {
provRecords.add(record);
}
return null;
}
}).when(provRepo).registerEvents(Mockito.any(Iterable.class));
queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000); queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
TestFlowFile.idGenerator.set(0L); TestFlowFile.idGenerator.set(0L);
@ -179,6 +198,39 @@ public class TestStandardFlowFileQueue {
assertTrue(queue.isActiveQueueEmpty()); assertTrue(queue.isActiveQueueEmpty());
} }
@Test(timeout = 10000)
public void testBackPressureAfterDrop() throws InterruptedException {
queue.setBackPressureObjectThreshold(10);
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
queue.put(new TestFlowFile());
assertFalse(queue.isFull());
}
queue.put(new TestFlowFile());
assertTrue(queue.isFull());
Thread.sleep(100L);
final String requestId = UUID.randomUUID().toString();
final DropFlowFileStatus status = queue.dropFlowFiles(requestId, "Unit Test");
while (status.getState() != DropFlowFileState.COMPLETE) {
Thread.sleep(10L);
}
assertFalse(queue.isFull());
assertTrue(queue.isEmpty());
assertTrue(queue.isActiveQueueEmpty());
assertEquals(10, provRecords.size());
for (final ProvenanceEventRecord event : provRecords) {
assertNotNull(event);
assertEquals(ProvenanceEventType.DROP, event.getEventType());
}
}
@Test @Test
public void testBackPressureAfterPollSingle() throws InterruptedException { public void testBackPressureAfterPollSingle() throws InterruptedException {
queue.setBackPressureObjectThreshold(10); queue.setBackPressureObjectThreshold(10);