NIFI-730: bug fixes and code cleanup for swap manager and flowfile queue

This commit is contained in:
Mark Payne 2015-10-14 09:14:15 -04:00
parent afb76afcd0
commit 77f7d7524c
5 changed files with 214 additions and 110 deletions

View File

@ -40,16 +40,6 @@ public interface FlowFileSwapManager {
*/ */
void initialize(SwapManagerInitializationContext initializationContext); void initialize(SwapManagerInitializationContext initializationContext);
/**
* Drops all FlowFiles that are swapped out at the given location. This will update the Provenance
* Repository as well as the FlowFile Repository and
*
* @param swapLocation the location of the swap file to drop
* @param flowFileQueue the queue to which the FlowFiles belong
* @param user the user that initiated the request
*/
void dropSwappedFlowFiles(String swapLocation, FlowFileQueue flowFileQueue, String user) throws IOException;
/** /**
* Swaps out the given FlowFiles that belong to the queue with the given identifier. * Swaps out the given FlowFiles that belong to the queue with the given identifier.
* *

View File

@ -64,7 +64,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
} }
void setCurrentSize(final QueueSize queueSize) { void setCurrentSize(final QueueSize queueSize) {
this.currentSize = currentSize; this.currentSize = queueSize;
} }
@Override @Override

View File

@ -173,11 +173,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
} }
} }
@Override
public void dropSwappedFlowFiles(final String swapLocation, final FlowFileQueue flowFileQueue, final String user) throws IOException {
}
@Override @Override
public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue) throws IOException { public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue) throws IOException {

View File

@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -81,11 +80,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class); private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class);
private PriorityQueue<FlowFileRecord> activeQueue = null; private PriorityQueue<FlowFileRecord> activeQueue = null;
private long activeQueueContentSize = 0L;
private ArrayList<FlowFileRecord> swapQueue = null; private ArrayList<FlowFileRecord> swapQueue = null;
private int swappedRecordCount = 0; // private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0);
private long swappedContentSize = 0L; // private long activeQueueContentSize = 0L;
// private int swappedRecordCount = 0;
// private long swappedContentSize = 0L;
// private final AtomicReference<QueueSize> unacknowledgedSizeRef = new AtomicReference<>(new QueueSize(0, 0L));
private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L));
private String maximumQueueDataSize; private String maximumQueueDataSize;
private long maximumQueueByteCount; private long maximumQueueByteCount;
private boolean swapMode = false; private boolean swapMode = false;
@ -108,8 +112,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private final ResourceClaimManager resourceClaimManager; private final ResourceClaimManager resourceClaimManager;
private final AtomicBoolean queueFullRef = new AtomicBoolean(false); private final AtomicBoolean queueFullRef = new AtomicBoolean(false);
private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0);
private final AtomicReference<QueueSize> unacknowledgedSizeRef = new AtomicReference<>(new QueueSize(0, 0L));
// SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK! // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK!
private final ProcessScheduler scheduler; private final ProcessScheduler scheduler;
@ -208,49 +210,26 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override @Override
public QueueSize size() { public QueueSize size() {
readLock.lock(); return getQueueSize();
try {
return getQueueSize();
} finally {
readLock.unlock("getSize");
}
} }
/**
* MUST be called with lock held
*
* @return size of queue
*/
private QueueSize getQueueSize() {
final QueueSize unacknowledged = unacknowledgedSizeRef.get();
return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount(), private QueueSize getQueueSize() {
activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount()); return size.get().toQueueSize();
} }
@Override @Override
public boolean isEmpty() { public boolean isEmpty() {
readLock.lock(); return size.get().isEmpty();
try {
return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0;
} finally {
readLock.unlock("isEmpty");
}
} }
@Override @Override
public boolean isActiveQueueEmpty() { public boolean isActiveQueueEmpty() {
final int activeQueueSize = activeQueueSizeRef.get(); return size.get().activeQueueCount == 0;
return activeQueueSize == 0;
} }
public QueueSize getActiveQueueSize() { public QueueSize getActiveQueueSize() {
readLock.lock(); return size.get().activeQueueSize();
try {
return new QueueSize(activeQueue.size(), activeQueueContentSize);
} finally {
readLock.unlock("getActiveQueueSize");
}
} }
@Override @Override
@ -258,13 +237,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
if (queueFullRef.get()) { if (queueFullRef.get()) {
writeLock.lock(); writeLock.lock();
try { try {
updateUnacknowledgedSize(-1, -flowFile.getSize()); incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
queueFullRef.set(determineIfFull()); queueFullRef.set(determineIfFull());
} finally { } finally {
writeLock.unlock("acknowledge(FlowFileRecord)"); writeLock.unlock("acknowledge(FlowFileRecord)");
} }
} else { } else {
updateUnacknowledgedSize(-1, -flowFile.getSize()); incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
} }
if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
@ -284,13 +263,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
if (queueFullRef.get()) { if (queueFullRef.get()) {
writeLock.lock(); writeLock.lock();
try { try {
updateUnacknowledgedSize(-flowFiles.size(), -totalSize); incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
queueFullRef.set(determineIfFull()); queueFullRef.set(determineIfFull());
} finally { } finally {
writeLock.unlock("acknowledge(FlowFileRecord)"); writeLock.unlock("acknowledge(FlowFileRecord)");
} }
} else { } else {
updateUnacknowledgedSize(-flowFiles.size(), -totalSize); incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
} }
if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
@ -335,18 +314,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
try { try {
if (swapMode || activeQueue.size() >= swapThreshold) { if (swapMode || activeQueue.size() >= swapThreshold) {
swapQueue.add(file); swapQueue.add(file);
swappedContentSize += file.getSize(); incrementSwapQueueSize(1, file.getSize());
swappedRecordCount++;
swapMode = true; swapMode = true;
writeSwapFilesIfNecessary(); writeSwapFilesIfNecessary();
} else { } else {
activeQueueContentSize += file.getSize(); incrementActiveQueueSize(1, file.getSize());
activeQueue.add(file); activeQueue.add(file);
} }
queueFullRef.set(determineIfFull()); queueFullRef.set(determineIfFull());
} finally { } finally {
activeQueueSizeRef.set(activeQueue.size());
writeLock.unlock("put(FlowFileRecord)"); writeLock.unlock("put(FlowFileRecord)");
} }
@ -367,18 +344,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
try { try {
if (swapMode || activeQueue.size() >= swapThreshold - numFiles) { if (swapMode || activeQueue.size() >= swapThreshold - numFiles) {
swapQueue.addAll(files); swapQueue.addAll(files);
swappedContentSize += bytes; incrementSwapQueueSize(numFiles, bytes);
swappedRecordCount += numFiles;
swapMode = true; swapMode = true;
writeSwapFilesIfNecessary(); writeSwapFilesIfNecessary();
} else { } else {
activeQueueContentSize += bytes; incrementActiveQueueSize(numFiles, bytes);
activeQueue.addAll(files); activeQueue.addAll(files);
} }
queueFullRef.set(determineIfFull()); queueFullRef.set(determineIfFull());
} finally { } finally {
activeQueueSizeRef.set(activeQueue.size());
writeLock.unlock("putAll"); writeLock.unlock("putAll");
} }
@ -419,11 +394,10 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
flowFile = doPoll(expiredRecords, expirationMillis); flowFile = doPoll(expiredRecords, expirationMillis);
return flowFile; return flowFile;
} finally { } finally {
activeQueueSizeRef.set(activeQueue.size());
writeLock.unlock("poll(Set)"); writeLock.unlock("poll(Set)");
if (flowFile != null) { if (flowFile != null) {
updateUnacknowledgedSize(1, flowFile.getSize()); incrementUnacknowledgedQueueSize(1, flowFile.getSize());
} }
} }
} }
@ -435,14 +409,19 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
migrateSwapToActive(); migrateSwapToActive();
final boolean queueFullAtStart = queueFullRef.get(); final boolean queueFullAtStart = queueFullRef.get();
int expiredRecordCount = 0;
long expiredBytes = 0L;
do { do {
flowFile = this.activeQueue.poll(); flowFile = this.activeQueue.poll();
isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis)); isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis));
if (isExpired) { if (isExpired) {
expiredRecords.add(flowFile); expiredRecords.add(flowFile);
expiredRecordCount++;
expiredBytes += flowFile.getSize();
if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) { if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
activeQueueContentSize -= flowFile.getSize();
break; break;
} }
} else if (flowFile != null && flowFile.isPenalized()) { } else if (flowFile != null && flowFile.isPenalized()) {
@ -452,7 +431,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} }
if (flowFile != null) { if (flowFile != null) {
activeQueueContentSize -= flowFile.getSize(); incrementActiveQueueSize(-1, -flowFile.getSize());
}
if (expiredRecordCount > 0) {
incrementActiveQueueSize(-expiredRecordCount, -expiredBytes);
} }
} while (isExpired); } while (isExpired);
@ -475,7 +458,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
try { try {
doPoll(records, maxResults, expiredRecords); doPoll(records, maxResults, expiredRecords);
} finally { } finally {
activeQueueSizeRef.set(activeQueue.size());
writeLock.unlock("poll(int, Set)"); writeLock.unlock("poll(int, Set)");
} }
return records; return records;
@ -493,8 +475,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
expiredBytes += record.getSize(); expiredBytes += record.getSize();
} }
activeQueueContentSize -= bytesDrained; incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained);
updateUnacknowledgedSize(records.size(), bytesDrained - expiredBytes); incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes);
// if at least 1 FlowFile was expired & the queue was full before we started, then // if at least 1 FlowFile was expired & the queue was full before we started, then
// we need to determine whether or not the queue is full again. If no FlowFile was expired, // we need to determine whether or not the queue is full again. If no FlowFile was expired,
@ -538,14 +520,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final String swapLocation = swapLocations.remove(0); final String swapLocation = swapLocations.remove(0);
try { try {
final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, this); final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, this);
swappedRecordCount -= swappedIn.size();
long swapSize = 0L; long swapSize = 0L;
for (final FlowFileRecord flowFile : swappedIn) { for (final FlowFileRecord flowFile : swappedIn) {
swapSize += flowFile.getSize(); swapSize += flowFile.getSize();
} }
swappedContentSize -= swapSize; incrementSwapQueueSize(-swappedIn.size(), -swapSize);
activeQueueContentSize += swapSize; incrementActiveQueueSize(swappedIn.size(), swapSize);
activeQueueSizeRef.set(activeQueue.size());
activeQueue.addAll(swappedIn); activeQueue.addAll(swappedIn);
return; return;
} catch (final FileNotFoundException fnfe) { } catch (final FileNotFoundException fnfe) {
@ -567,28 +547,33 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
// this is the most common condition (nothing is swapped out), so do the check first and avoid the expense // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense
// of other checks for 99.999% of the cases. // of other checks for 99.999% of the cases.
if (swappedRecordCount == 0 && swapQueue.isEmpty()) { if (size.get().swappedCount == 0 && swapQueue.isEmpty()) {
return; return;
} }
if (swappedRecordCount > swapQueue.size()) { if (size.get().swappedCount > swapQueue.size()) {
// we already have FlowFiles swapped out, so we won't migrate the queue; we will wait for // we already have FlowFiles swapped out, so we won't migrate the queue; we will wait for
// an external process to swap FlowFiles back in. // an external process to swap FlowFiles back in.
return; return;
} }
int recordsMigrated = 0;
long bytesMigrated = 0L;
final Iterator<FlowFileRecord> swapItr = swapQueue.iterator(); final Iterator<FlowFileRecord> swapItr = swapQueue.iterator();
while (activeQueue.size() < swapThreshold && swapItr.hasNext()) { while (activeQueue.size() < swapThreshold && swapItr.hasNext()) {
final FlowFileRecord toMigrate = swapItr.next(); final FlowFileRecord toMigrate = swapItr.next();
activeQueue.add(toMigrate); activeQueue.add(toMigrate);
activeQueueContentSize += toMigrate.getSize(); bytesMigrated += toMigrate.getSize();
swappedContentSize -= toMigrate.getSize(); recordsMigrated++;
swappedRecordCount--;
swapItr.remove(); swapItr.remove();
} }
if (swappedRecordCount == 0) { if (recordsMigrated > 0) {
incrementActiveQueueSize(recordsMigrated, bytesMigrated);
incrementSwapQueueSize(-recordsMigrated, -bytesMigrated);
}
if (size.get().swappedCount == 0) {
swapMode = false; swapMode = false;
} }
} }
@ -603,18 +588,29 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE; final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE;
int originalSwapQueueCount = swapQueue.size();
long originalSwapQueueBytes = 0L;
for (final FlowFileRecord flowFile : swapQueue) {
originalSwapQueueBytes += flowFile.getSize();
}
// Create a new Priority queue with the prioritizers that are set, but reverse the // Create a new Priority queue with the prioritizers that are set, but reverse the
// prioritizers because we want to pull the lowest-priority FlowFiles to swap out // prioritizers because we want to pull the lowest-priority FlowFiles to swap out
final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(activeQueue.size() + swapQueue.size(), Collections.reverseOrder(new Prioritizer(priorities))); final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(activeQueue.size() + swapQueue.size(), Collections.reverseOrder(new Prioritizer(priorities)));
tempQueue.addAll(activeQueue); tempQueue.addAll(activeQueue);
tempQueue.addAll(swapQueue); tempQueue.addAll(swapQueue);
long bytesSwappedOut = 0L;
int flowFilesSwappedOut = 0;
final List<String> swapLocations = new ArrayList<>(numSwapFiles); final List<String> swapLocations = new ArrayList<>(numSwapFiles);
for (int i = 0; i < numSwapFiles; i++) { for (int i = 0; i < numSwapFiles; i++) {
// Create a new swap file for the next SWAP_RECORD_POLL_SIZE records // Create a new swap file for the next SWAP_RECORD_POLL_SIZE records
final List<FlowFileRecord> toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE); final List<FlowFileRecord> toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE);
for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) { for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) {
toSwap.add(tempQueue.poll()); final FlowFileRecord flowFile = tempQueue.poll();
toSwap.add(flowFile);
bytesSwappedOut += flowFile.getSize();
flowFilesSwappedOut++;
} }
try { try {
@ -639,9 +635,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
// Pull any records off of the temp queue that won't fit back on the active queue, and add those to the // Pull any records off of the temp queue that won't fit back on the active queue, and add those to the
// swap queue. Then add the records back to the active queue. // swap queue. Then add the records back to the active queue.
swapQueue.clear(); swapQueue.clear();
long updatedSwapQueueBytes = 0L;
while (tempQueue.size() > swapThreshold) { while (tempQueue.size() > swapThreshold) {
final FlowFileRecord record = tempQueue.poll(); final FlowFileRecord record = tempQueue.poll();
swapQueue.add(record); swapQueue.add(record);
updatedSwapQueueBytes += record.getSize();
} }
Collections.reverse(swapQueue); // currently ordered in reverse priority order based on the ordering of the temp queue Collections.reverse(swapQueue); // currently ordered in reverse priority order based on the ordering of the temp queue
@ -649,9 +647,25 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
// replace the contents of the active queue, since we've merged it with the swap queue. // replace the contents of the active queue, since we've merged it with the swap queue.
activeQueue.clear(); activeQueue.clear();
FlowFileRecord toRequeue; FlowFileRecord toRequeue;
long activeQueueBytes = 0L;
while ((toRequeue = tempQueue.poll()) != null) { while ((toRequeue = tempQueue.poll()) != null) {
activeQueue.offer(toRequeue); activeQueue.offer(toRequeue);
activeQueueBytes += toRequeue.getSize();
} }
boolean updated = false;
while (!updated) {
final FlowFileQueueSize originalSize = size.get();
final int addedSwapRecords = swapQueue.size() - originalSwapQueueCount;
final long addedSwapBytes = updatedSwapQueueBytes - originalSwapQueueBytes;
final FlowFileQueueSize newSize = new FlowFileQueueSize(activeQueue.size(), activeQueueBytes,
originalSize.swappedCount + addedSwapRecords + flowFilesSwappedOut, originalSize.swappedBytes + addedSwapBytes + bytesSwappedOut,
originalSize.unacknowledgedCount, originalSize.unacknowledgedBytes);
updated = size.compareAndSet(originalSize, newSize);
}
this.swapLocations.addAll(swapLocations); this.swapLocations.addAll(swapLocations);
} }
@ -682,6 +696,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override @Override
public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) { public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords) {
long bytesPulled = 0L;
int flowFilesPulled = 0;
writeLock.lock(); writeLock.lock();
try { try {
migrateSwapToActive(); migrateSwapToActive();
@ -701,7 +718,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final boolean isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis)); final boolean isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis));
if (isExpired) { if (isExpired) {
expiredRecords.add(flowFile); expiredRecords.add(flowFile);
activeQueueContentSize -= flowFile.getSize(); bytesPulled += flowFile.getSize();
flowFilesPulled++;
if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) { if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) {
break; break;
@ -716,9 +734,10 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final FlowFileFilterResult result = filter.filter(flowFile); final FlowFileFilterResult result = filter.filter(flowFile);
if (result.isAccept()) { if (result.isAccept()) {
activeQueueContentSize -= flowFile.getSize(); bytesPulled += flowFile.getSize();
flowFilesPulled++;
updateUnacknowledgedSize(1, flowFile.getSize()); incrementUnacknowledgedQueueSize(1, flowFile.getSize());
selectedFlowFiles.add(flowFile); selectedFlowFiles.add(flowFile);
} else { } else {
unselected.add(flowFile); unselected.add(flowFile);
@ -740,7 +759,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
return selectedFlowFiles; return selectedFlowFiles;
} finally { } finally {
activeQueueSizeRef.set(activeQueue.size()); incrementActiveQueueSize(-flowFilesPulled, -bytesPulled);
writeLock.unlock("poll(Filter, Set)"); writeLock.unlock("poll(Filter, Set)");
} }
} }
@ -880,8 +899,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} }
} }
this.swappedRecordCount = swapFlowFileCount; incrementSwapQueueSize(swapFlowFileCount, swapByteCount);
this.swappedContentSize = swapByteCount;
this.swapLocations.addAll(swapLocations); this.swapLocations.addAll(swapLocations);
} finally { } finally {
writeLock.unlock("Recover Swap Files"); writeLock.unlock("Recover Swap Files");
@ -900,6 +918,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override @Override
public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) { public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) {
logger.info("Initiating drop of FlowFiles from {} on behalf of {} (request identifier={})", this, requestor, requestIdentifier);
// purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother
if (dropRequestMap.size() > 10) { if (dropRequestMap.size() > 10) {
final List<String> toDrop = new ArrayList<>(); final List<String> toDrop = new ArrayList<>();
@ -924,6 +944,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
writeLock.lock(); writeLock.lock();
try { try {
dropRequest.setState(DropFlowFileState.DROPPING_ACTIVE_FLOWFILES); dropRequest.setState(DropFlowFileState.DROPPING_ACTIVE_FLOWFILES);
logger.debug("For DropFlowFileRequest {}, original size is {}", requestIdentifier, getQueueSize());
dropRequest.setOriginalSize(getQueueSize()); dropRequest.setOriginalSize(getQueueSize());
try { try {
@ -932,6 +953,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
QueueSize droppedSize; QueueSize droppedSize;
try { try {
droppedSize = drop(activeQueueRecords, requestor); droppedSize = drop(activeQueueRecords, requestor);
logger.debug("For DropFlowFileRequest {}, Dropped {} from active queue", requestIdentifier, droppedSize);
} catch (final IOException ioe) { } catch (final IOException ioe) {
logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
logger.error("", ioe); logger.error("", ioe);
@ -941,12 +963,15 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} }
activeQueue.clear(); activeQueue.clear();
activeQueueContentSize = 0; incrementActiveQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount());
activeQueueSizeRef.set(0);
dropRequest.setCurrentSize(getQueueSize()); dropRequest.setCurrentSize(getQueueSize());
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
try { try {
final QueueSize swapSize = size.get().swapQueueSize();
logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}",
requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount());
droppedSize = drop(swapQueue, requestor); droppedSize = drop(swapQueue, requestor);
} catch (final IOException ioe) { } catch (final IOException ioe) {
logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
@ -960,9 +985,10 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
dropRequest.setCurrentSize(getQueueSize()); dropRequest.setCurrentSize(getQueueSize());
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
swapMode = false; swapMode = false;
swappedContentSize -= droppedSize.getByteCount(); incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount());
swappedRecordCount -= droppedSize.getObjectCount(); logger.debug("For DropFlowFileRequest {}, dropped {} from Swap Queue", requestIdentifier, droppedSize);
final int swapFileCount = swapLocations.size();
final Iterator<String> swapLocationItr = swapLocations.iterator(); final Iterator<String> swapLocationItr = swapLocations.iterator();
while (swapLocationItr.hasNext()) { while (swapLocationItr.hasNext()) {
final String swapLocation = swapLocationItr.next(); final String swapLocation = swapLocationItr.next();
@ -985,14 +1011,20 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} }
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
swappedContentSize -= droppedSize.getByteCount(); incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount());
swappedRecordCount -= droppedSize.getObjectCount();
dropRequest.setCurrentSize(getQueueSize()); dropRequest.setCurrentSize(getQueueSize());
swapLocationItr.remove(); swapLocationItr.remove();
logger.debug("For DropFlowFileRequest {}, dropped {} for Swap File {}", requestIdentifier, droppedSize, swapLocation);
} }
logger.debug("Dropped FlowFiles from {} Swap Files", swapFileCount);
logger.info("Successfully dropped {} FlowFiles ({} bytes) from Connection with ID {} on behalf of {}",
dropRequest.getDroppedSize().getObjectCount(), dropRequest.getDroppedSize().getByteCount(), StandardFlowFileQueue.this.getIdentifier(), requestor);
dropRequest.setState(DropFlowFileState.COMPLETE); dropRequest.setState(DropFlowFileState.COMPLETE);
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to drop FlowFiles from Connection with ID {} due to {}", StandardFlowFileQueue.this.getIdentifier(), e.toString());
logger.error("", e);
dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.toString()); dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.toString());
} }
} finally { } finally {
@ -1020,6 +1052,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
long dropContentSize = 0L; long dropContentSize = 0L;
for (final FlowFileRecord flowFile : flowFiles) { for (final FlowFileRecord flowFile : flowFiles) {
dropContentSize += flowFile.getSize();
final ContentClaim contentClaim = flowFile.getContentClaim(); final ContentClaim contentClaim = flowFile.getContentClaim();
if (contentClaim == null) { if (contentClaim == null) {
continue; continue;
@ -1031,7 +1064,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
} }
resourceClaimManager.decrementClaimantCount(resourceClaim); resourceClaimManager.decrementClaimantCount(resourceClaim);
dropContentSize += flowFile.getSize();
} }
provRepository.registerEvents(provenanceEvents); provRepository.registerEvents(provenanceEvents);
@ -1138,16 +1170,77 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override @Override
public QueueSize getUnacknowledgedQueueSize() { public QueueSize getUnacknowledgedQueueSize() {
return unacknowledgedSizeRef.get(); return size.get().unacknowledgedQueueSize();
} }
private void updateUnacknowledgedSize(final int addToCount, final long addToSize) {
boolean updated = false;
do { private void incrementActiveQueueSize(final int count, final long bytes) {
final QueueSize queueSize = unacknowledgedSizeRef.get(); boolean updated = false;
final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize); while (!updated) {
updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize); final FlowFileQueueSize original = size.get();
} while (!updated); final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount + count, original.activeQueueBytes + bytes,
original.swappedCount, original.swappedBytes, original.unacknowledgedCount, original.unacknowledgedBytes);
updated = size.compareAndSet(original, newSize);
}
}
private void incrementSwapQueueSize(final int count, final long bytes) {
boolean updated = false;
while (!updated) {
final FlowFileQueueSize original = size.get();
final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes,
original.swappedCount + count, original.swappedBytes + bytes, original.unacknowledgedCount, original.unacknowledgedBytes);
updated = size.compareAndSet(original, newSize);
}
}
private void incrementUnacknowledgedQueueSize(final int count, final long bytes) {
boolean updated = false;
while (!updated) {
final FlowFileQueueSize original = size.get();
final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes,
original.swappedCount, original.swappedBytes, original.unacknowledgedCount + count, original.unacknowledgedBytes + bytes);
updated = size.compareAndSet(original, newSize);
}
}
private static class FlowFileQueueSize {
private final int activeQueueCount;
private final long activeQueueBytes;
private final int swappedCount;
private final long swappedBytes;
private final int unacknowledgedCount;
private final long unacknowledgedBytes;
public FlowFileQueueSize(final int activeQueueCount, final long activeQueueBytes, final int swappedCount, final long swappedBytes,
final int unacknowledgedCount, final long unacknowledgedBytes) {
this.activeQueueCount = activeQueueCount;
this.activeQueueBytes = activeQueueBytes;
this.swappedCount = swappedCount;
this.swappedBytes = swappedBytes;
this.unacknowledgedCount = unacknowledgedCount;
this.unacknowledgedBytes = unacknowledgedBytes;
}
public boolean isEmpty() {
return activeQueueCount == 0 && swappedCount == 0 && unacknowledgedCount == 0;
}
public QueueSize toQueueSize() {
return new QueueSize(activeQueueCount + swappedCount + unacknowledgedCount, activeQueueBytes + swappedBytes + unacknowledgedBytes);
}
public QueueSize activeQueueSize() {
return new QueueSize(activeQueueCount, activeQueueBytes);
}
public QueueSize unacknowledgedQueueSize() {
return new QueueSize(unacknowledgedCount, unacknowledgedBytes);
}
public QueueSize swapQueueSize() {
return new QueueSize(swappedCount, swappedBytes);
}
} }
} }

View File

@ -34,6 +34,8 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRecord;
@ -44,7 +46,9 @@ import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -66,6 +70,8 @@ public class TestStandardFlowFileQueue {
final ProvenanceEventRepository provRepo = Mockito.mock(ProvenanceEventRepository.class); final ProvenanceEventRepository provRepo = Mockito.mock(ProvenanceEventRepository.class);
final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class); final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder());
queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000); queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
TestFlowFile.idGenerator.set(0L); TestFlowFile.idGenerator.set(0L);
} }
@ -148,16 +154,37 @@ public class TestStandardFlowFileQueue {
assertTrue(swapManager.swappedOut.isEmpty()); assertTrue(swapManager.swappedOut.isEmpty());
queue.poll(exp); queue.poll(exp);
} }
@Test
public void testDropSwappedFlowFiles() {
for (int i = 1; i <= 210000; i++) {
queue.put(new TestFlowFile());
}
assertEquals(20, swapManager.swappedOut.size());
final DropFlowFileStatus status = queue.dropFlowFiles("1", "Unit Test");
while (status.getState() != DropFlowFileState.COMPLETE) {
final QueueSize queueSize = queue.size();
System.out.println(queueSize);
try {
Thread.sleep(1000L);
} catch (final Exception e) {
}
}
System.out.println(queue.size());
assertEquals(0, queue.size().getObjectCount());
assertEquals(0, queue.size().getByteCount());
assertEquals(0, swapManager.swappedOut.size());
assertEquals(20, swapManager.swapInCalledCount);
}
private class TestSwapManager implements FlowFileSwapManager { private class TestSwapManager implements FlowFileSwapManager {
private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>(); private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();
int swapOutCalledCount = 0; int swapOutCalledCount = 0;
int swapInCalledCount = 0; int swapInCalledCount = 0;
@Override @Override
public void initialize(final SwapManagerInitializationContext initializationContext) { public void initialize(final SwapManagerInitializationContext initializationContext) {
@ -187,11 +214,6 @@ public class TestStandardFlowFileQueue {
return new ArrayList<String>(swappedOut.keySet()); return new ArrayList<String>(swappedOut.keySet());
} }
@Override
public void dropSwappedFlowFiles(String swapLocation, final FlowFileQueue flowFileQueue, String user) {
}
@Override @Override
public QueueSize getSwapSize(String swapLocation) throws IOException { public QueueSize getSwapSize(String swapLocation) throws IOException {
final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation); final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
@ -252,6 +274,10 @@ public class TestStandardFlowFileQueue {
public TestFlowFile(final Map<String, String> attributes, final long size) { public TestFlowFile(final Map<String, String> attributes, final long size) {
this.attributes = attributes; this.attributes = attributes;
this.size = size; this.size = size;
if (!attributes.containsKey(CoreAttributes.UUID.key())) {
attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
}
} }