From 77f7d7524cb8b07ed2976088f0e57d99233c8327 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 14 Oct 2015 09:14:15 -0400 Subject: [PATCH] NIFI-730: bug fixes and code cleanup for swap manager and flowfile queue --- .../repository/FlowFileSwapManager.java | 10 - .../nifi/controller/DropFlowFileRequest.java | 2 +- .../controller/FileSystemSwapManager.java | 5 - .../controller/StandardFlowFileQueue.java | 267 ++++++++++++------ .../controller/TestStandardFlowFileQueue.java | 40 ++- 5 files changed, 214 insertions(+), 110 deletions(-) rename nifi-nar-bundles/nifi-framework-bundle/nifi-framework/{nifi-framework-core-api => nifi-framework-core}/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java (81%) rename nifi-nar-bundles/nifi-framework-bundle/nifi-framework/{nifi-framework-core-api => nifi-framework-core}/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java (88%) diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java index a70d287435..3e341f856c 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java @@ -40,16 +40,6 @@ public interface FlowFileSwapManager { */ void initialize(SwapManagerInitializationContext initializationContext); - /** - * Drops all FlowFiles that are swapped out at the given location. This will update the Provenance - * Repository as well as the FlowFile Repository and - * - * @param swapLocation the location of the swap file to drop - * @param flowFileQueue the queue to which the FlowFiles belong - * @param user the user that initiated the request - */ - void dropSwappedFlowFiles(String swapLocation, FlowFileQueue flowFileQueue, String user) throws IOException; - /** * Swaps out the given FlowFiles that belong to the queue with the given identifier. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java index 189fe7deab..58695c2cfb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java @@ -64,7 +64,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus { } void setCurrentSize(final QueueSize queueSize) { - this.currentSize = currentSize; + this.currentSize = queueSize; } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index c4a86f2763..1162f390f7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -173,11 +173,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } } - @Override - public void dropSwappedFlowFiles(final String swapLocation, final FlowFileQueue flowFileQueue, final String user) throws IOException { - - } - @Override public List recoverSwapLocations(final FlowFileQueue flowFileQueue) throws IOException { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java similarity index 81% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 5b137f7f46..acf2830995 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -81,11 +80,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private static final Logger logger = LoggerFactory.getLogger(StandardFlowFileQueue.class); private PriorityQueue activeQueue = null; - private long activeQueueContentSize = 0L; private ArrayList swapQueue = null; - private int swappedRecordCount = 0; - private long swappedContentSize = 0L; + // private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0); + // private long activeQueueContentSize = 0L; + // private int swappedRecordCount = 0; + // private long swappedContentSize = 0L; + // private final AtomicReference unacknowledgedSizeRef = new AtomicReference<>(new QueueSize(0, 0L)); + + private final AtomicReference size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0L)); + private String maximumQueueDataSize; private long maximumQueueByteCount; private boolean swapMode = false; @@ -108,8 +112,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final ResourceClaimManager resourceClaimManager; private final AtomicBoolean queueFullRef = new AtomicBoolean(false); - private final AtomicInteger activeQueueSizeRef = new AtomicInteger(0); - private final AtomicReference unacknowledgedSizeRef = new AtomicReference<>(new QueueSize(0, 0L)); // SCHEDULER CANNOT BE NOTIFIED OF EVENTS WITH THE WRITE LOCK HELD! DOING SO WILL RESULT IN A DEADLOCK! private final ProcessScheduler scheduler; @@ -208,49 +210,26 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public QueueSize size() { - readLock.lock(); - try { - return getQueueSize(); - } finally { - readLock.unlock("getSize"); - } + return getQueueSize(); } - /** - * MUST be called with lock held - * - * @return size of queue - */ - private QueueSize getQueueSize() { - final QueueSize unacknowledged = unacknowledgedSizeRef.get(); - return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount(), - activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount()); + private QueueSize getQueueSize() { + return size.get().toQueueSize(); } @Override public boolean isEmpty() { - readLock.lock(); - try { - return activeQueue.isEmpty() && swappedRecordCount == 0 && unacknowledgedSizeRef.get().getObjectCount() == 0; - } finally { - readLock.unlock("isEmpty"); - } + return size.get().isEmpty(); } @Override public boolean isActiveQueueEmpty() { - final int activeQueueSize = activeQueueSizeRef.get(); - return activeQueueSize == 0; + return size.get().activeQueueCount == 0; } public QueueSize getActiveQueueSize() { - readLock.lock(); - try { - return new QueueSize(activeQueue.size(), activeQueueContentSize); - } finally { - readLock.unlock("getActiveQueueSize"); - } + return size.get().activeQueueSize(); } @Override @@ -258,13 +237,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue { if (queueFullRef.get()) { writeLock.lock(); try { - updateUnacknowledgedSize(-1, -flowFile.getSize()); + incrementUnacknowledgedQueueSize(-1, -flowFile.getSize()); queueFullRef.set(determineIfFull()); } finally { writeLock.unlock("acknowledge(FlowFileRecord)"); } } else { - updateUnacknowledgedSize(-1, -flowFile.getSize()); + incrementUnacknowledgedQueueSize(-1, -flowFile.getSize()); } if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { @@ -284,13 +263,13 @@ public final class StandardFlowFileQueue implements FlowFileQueue { if (queueFullRef.get()) { writeLock.lock(); try { - updateUnacknowledgedSize(-flowFiles.size(), -totalSize); + incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize); queueFullRef.set(determineIfFull()); } finally { writeLock.unlock("acknowledge(FlowFileRecord)"); } } else { - updateUnacknowledgedSize(-flowFiles.size(), -totalSize); + incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize); } if (connection.getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) { @@ -335,18 +314,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue { try { if (swapMode || activeQueue.size() >= swapThreshold) { swapQueue.add(file); - swappedContentSize += file.getSize(); - swappedRecordCount++; + incrementSwapQueueSize(1, file.getSize()); swapMode = true; writeSwapFilesIfNecessary(); } else { - activeQueueContentSize += file.getSize(); + incrementActiveQueueSize(1, file.getSize()); activeQueue.add(file); } queueFullRef.set(determineIfFull()); } finally { - activeQueueSizeRef.set(activeQueue.size()); writeLock.unlock("put(FlowFileRecord)"); } @@ -367,18 +344,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue { try { if (swapMode || activeQueue.size() >= swapThreshold - numFiles) { swapQueue.addAll(files); - swappedContentSize += bytes; - swappedRecordCount += numFiles; + incrementSwapQueueSize(numFiles, bytes); swapMode = true; writeSwapFilesIfNecessary(); } else { - activeQueueContentSize += bytes; + incrementActiveQueueSize(numFiles, bytes); activeQueue.addAll(files); } queueFullRef.set(determineIfFull()); } finally { - activeQueueSizeRef.set(activeQueue.size()); writeLock.unlock("putAll"); } @@ -419,11 +394,10 @@ public final class StandardFlowFileQueue implements FlowFileQueue { flowFile = doPoll(expiredRecords, expirationMillis); return flowFile; } finally { - activeQueueSizeRef.set(activeQueue.size()); writeLock.unlock("poll(Set)"); if (flowFile != null) { - updateUnacknowledgedSize(1, flowFile.getSize()); + incrementUnacknowledgedQueueSize(1, flowFile.getSize()); } } } @@ -435,14 +409,19 @@ public final class StandardFlowFileQueue implements FlowFileQueue { migrateSwapToActive(); final boolean queueFullAtStart = queueFullRef.get(); + int expiredRecordCount = 0; + long expiredBytes = 0L; + do { flowFile = this.activeQueue.poll(); isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis)); if (isExpired) { expiredRecords.add(flowFile); + expiredRecordCount++; + expiredBytes += flowFile.getSize(); + if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) { - activeQueueContentSize -= flowFile.getSize(); break; } } else if (flowFile != null && flowFile.isPenalized()) { @@ -452,7 +431,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } if (flowFile != null) { - activeQueueContentSize -= flowFile.getSize(); + incrementActiveQueueSize(-1, -flowFile.getSize()); + } + + if (expiredRecordCount > 0) { + incrementActiveQueueSize(-expiredRecordCount, -expiredBytes); } } while (isExpired); @@ -475,7 +458,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { try { doPoll(records, maxResults, expiredRecords); } finally { - activeQueueSizeRef.set(activeQueue.size()); writeLock.unlock("poll(int, Set)"); } return records; @@ -493,8 +475,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { expiredBytes += record.getSize(); } - activeQueueContentSize -= bytesDrained; - updateUnacknowledgedSize(records.size(), bytesDrained - expiredBytes); + incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained); + incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes); // if at least 1 FlowFile was expired & the queue was full before we started, then // we need to determine whether or not the queue is full again. If no FlowFile was expired, @@ -538,14 +520,12 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final String swapLocation = swapLocations.remove(0); try { final List swappedIn = swapManager.swapIn(swapLocation, this); - swappedRecordCount -= swappedIn.size(); long swapSize = 0L; for (final FlowFileRecord flowFile : swappedIn) { swapSize += flowFile.getSize(); } - swappedContentSize -= swapSize; - activeQueueContentSize += swapSize; - activeQueueSizeRef.set(activeQueue.size()); + incrementSwapQueueSize(-swappedIn.size(), -swapSize); + incrementActiveQueueSize(swappedIn.size(), swapSize); activeQueue.addAll(swappedIn); return; } catch (final FileNotFoundException fnfe) { @@ -567,28 +547,33 @@ public final class StandardFlowFileQueue implements FlowFileQueue { // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense // of other checks for 99.999% of the cases. - if (swappedRecordCount == 0 && swapQueue.isEmpty()) { + if (size.get().swappedCount == 0 && swapQueue.isEmpty()) { return; } - if (swappedRecordCount > swapQueue.size()) { + if (size.get().swappedCount > swapQueue.size()) { // we already have FlowFiles swapped out, so we won't migrate the queue; we will wait for // an external process to swap FlowFiles back in. return; } + int recordsMigrated = 0; + long bytesMigrated = 0L; final Iterator swapItr = swapQueue.iterator(); while (activeQueue.size() < swapThreshold && swapItr.hasNext()) { final FlowFileRecord toMigrate = swapItr.next(); activeQueue.add(toMigrate); - activeQueueContentSize += toMigrate.getSize(); - swappedContentSize -= toMigrate.getSize(); - swappedRecordCount--; - + bytesMigrated += toMigrate.getSize(); + recordsMigrated++; swapItr.remove(); } - if (swappedRecordCount == 0) { + if (recordsMigrated > 0) { + incrementActiveQueueSize(recordsMigrated, bytesMigrated); + incrementSwapQueueSize(-recordsMigrated, -bytesMigrated); + } + + if (size.get().swappedCount == 0) { swapMode = false; } } @@ -603,18 +588,29 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE; + int originalSwapQueueCount = swapQueue.size(); + long originalSwapQueueBytes = 0L; + for (final FlowFileRecord flowFile : swapQueue) { + originalSwapQueueBytes += flowFile.getSize(); + } + // Create a new Priority queue with the prioritizers that are set, but reverse the // prioritizers because we want to pull the lowest-priority FlowFiles to swap out final PriorityQueue tempQueue = new PriorityQueue<>(activeQueue.size() + swapQueue.size(), Collections.reverseOrder(new Prioritizer(priorities))); tempQueue.addAll(activeQueue); tempQueue.addAll(swapQueue); + long bytesSwappedOut = 0L; + int flowFilesSwappedOut = 0; final List swapLocations = new ArrayList<>(numSwapFiles); for (int i = 0; i < numSwapFiles; i++) { // Create a new swap file for the next SWAP_RECORD_POLL_SIZE records final List toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE); for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) { - toSwap.add(tempQueue.poll()); + final FlowFileRecord flowFile = tempQueue.poll(); + toSwap.add(flowFile); + bytesSwappedOut += flowFile.getSize(); + flowFilesSwappedOut++; } try { @@ -639,9 +635,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue { // Pull any records off of the temp queue that won't fit back on the active queue, and add those to the // swap queue. Then add the records back to the active queue. swapQueue.clear(); + long updatedSwapQueueBytes = 0L; while (tempQueue.size() > swapThreshold) { final FlowFileRecord record = tempQueue.poll(); swapQueue.add(record); + updatedSwapQueueBytes += record.getSize(); } Collections.reverse(swapQueue); // currently ordered in reverse priority order based on the ordering of the temp queue @@ -649,9 +647,25 @@ public final class StandardFlowFileQueue implements FlowFileQueue { // replace the contents of the active queue, since we've merged it with the swap queue. activeQueue.clear(); FlowFileRecord toRequeue; + long activeQueueBytes = 0L; while ((toRequeue = tempQueue.poll()) != null) { activeQueue.offer(toRequeue); + activeQueueBytes += toRequeue.getSize(); } + + boolean updated = false; + while (!updated) { + final FlowFileQueueSize originalSize = size.get(); + + final int addedSwapRecords = swapQueue.size() - originalSwapQueueCount; + final long addedSwapBytes = updatedSwapQueueBytes - originalSwapQueueBytes; + + final FlowFileQueueSize newSize = new FlowFileQueueSize(activeQueue.size(), activeQueueBytes, + originalSize.swappedCount + addedSwapRecords + flowFilesSwappedOut, originalSize.swappedBytes + addedSwapBytes + bytesSwappedOut, + originalSize.unacknowledgedCount, originalSize.unacknowledgedBytes); + updated = size.compareAndSet(originalSize, newSize); + } + this.swapLocations.addAll(swapLocations); } @@ -682,6 +696,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public List poll(final FlowFileFilter filter, final Set expiredRecords) { + long bytesPulled = 0L; + int flowFilesPulled = 0; + writeLock.lock(); try { migrateSwapToActive(); @@ -701,7 +718,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final boolean isExpired = isLaterThan(getExpirationDate(flowFile, expirationMillis)); if (isExpired) { expiredRecords.add(flowFile); - activeQueueContentSize -= flowFile.getSize(); + bytesPulled += flowFile.getSize(); + flowFilesPulled++; if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) { break; @@ -716,9 +734,10 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final FlowFileFilterResult result = filter.filter(flowFile); if (result.isAccept()) { - activeQueueContentSize -= flowFile.getSize(); + bytesPulled += flowFile.getSize(); + flowFilesPulled++; - updateUnacknowledgedSize(1, flowFile.getSize()); + incrementUnacknowledgedQueueSize(1, flowFile.getSize()); selectedFlowFiles.add(flowFile); } else { unselected.add(flowFile); @@ -740,7 +759,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { return selectedFlowFiles; } finally { - activeQueueSizeRef.set(activeQueue.size()); + incrementActiveQueueSize(-flowFilesPulled, -bytesPulled); writeLock.unlock("poll(Filter, Set)"); } } @@ -880,8 +899,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } } - this.swappedRecordCount = swapFlowFileCount; - this.swappedContentSize = swapByteCount; + incrementSwapQueueSize(swapFlowFileCount, swapByteCount); this.swapLocations.addAll(swapLocations); } finally { writeLock.unlock("Recover Swap Files"); @@ -900,6 +918,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) { + logger.info("Initiating drop of FlowFiles from {} on behalf of {} (request identifier={})", this, requestor, requestIdentifier); + // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother if (dropRequestMap.size() > 10) { final List toDrop = new ArrayList<>(); @@ -924,6 +944,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { writeLock.lock(); try { dropRequest.setState(DropFlowFileState.DROPPING_ACTIVE_FLOWFILES); + logger.debug("For DropFlowFileRequest {}, original size is {}", requestIdentifier, getQueueSize()); dropRequest.setOriginalSize(getQueueSize()); try { @@ -932,6 +953,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { QueueSize droppedSize; try { droppedSize = drop(activeQueueRecords, requestor); + logger.debug("For DropFlowFileRequest {}, Dropped {} from active queue", requestIdentifier, droppedSize); } catch (final IOException ioe) { logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); logger.error("", ioe); @@ -941,12 +963,15 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } activeQueue.clear(); - activeQueueContentSize = 0; - activeQueueSizeRef.set(0); + incrementActiveQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount()); dropRequest.setCurrentSize(getQueueSize()); dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); try { + final QueueSize swapSize = size.get().swapQueueSize(); + + logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}", + requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount()); droppedSize = drop(swapQueue, requestor); } catch (final IOException ioe) { logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString()); @@ -960,9 +985,10 @@ public final class StandardFlowFileQueue implements FlowFileQueue { dropRequest.setCurrentSize(getQueueSize()); dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); swapMode = false; - swappedContentSize -= droppedSize.getByteCount(); - swappedRecordCount -= droppedSize.getObjectCount(); + incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount()); + logger.debug("For DropFlowFileRequest {}, dropped {} from Swap Queue", requestIdentifier, droppedSize); + final int swapFileCount = swapLocations.size(); final Iterator swapLocationItr = swapLocations.iterator(); while (swapLocationItr.hasNext()) { final String swapLocation = swapLocationItr.next(); @@ -985,14 +1011,20 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); - swappedContentSize -= droppedSize.getByteCount(); - swappedRecordCount -= droppedSize.getObjectCount(); + incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount()); + dropRequest.setCurrentSize(getQueueSize()); swapLocationItr.remove(); + logger.debug("For DropFlowFileRequest {}, dropped {} for Swap File {}", requestIdentifier, droppedSize, swapLocation); } + logger.debug("Dropped FlowFiles from {} Swap Files", swapFileCount); + logger.info("Successfully dropped {} FlowFiles ({} bytes) from Connection with ID {} on behalf of {}", + dropRequest.getDroppedSize().getObjectCount(), dropRequest.getDroppedSize().getByteCount(), StandardFlowFileQueue.this.getIdentifier(), requestor); dropRequest.setState(DropFlowFileState.COMPLETE); } catch (final Exception e) { + logger.error("Failed to drop FlowFiles from Connection with ID {} due to {}", StandardFlowFileQueue.this.getIdentifier(), e.toString()); + logger.error("", e); dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.toString()); } } finally { @@ -1020,6 +1052,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { long dropContentSize = 0L; for (final FlowFileRecord flowFile : flowFiles) { + dropContentSize += flowFile.getSize(); final ContentClaim contentClaim = flowFile.getContentClaim(); if (contentClaim == null) { continue; @@ -1031,7 +1064,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } resourceClaimManager.decrementClaimantCount(resourceClaim); - dropContentSize += flowFile.getSize(); } provRepository.registerEvents(provenanceEvents); @@ -1138,16 +1170,77 @@ public final class StandardFlowFileQueue implements FlowFileQueue { @Override public QueueSize getUnacknowledgedQueueSize() { - return unacknowledgedSizeRef.get(); + return size.get().unacknowledgedQueueSize(); } - private void updateUnacknowledgedSize(final int addToCount, final long addToSize) { - boolean updated = false; - do { - final QueueSize queueSize = unacknowledgedSizeRef.get(); - final QueueSize newSize = new QueueSize(queueSize.getObjectCount() + addToCount, queueSize.getByteCount() + addToSize); - updated = unacknowledgedSizeRef.compareAndSet(queueSize, newSize); - } while (!updated); + private void incrementActiveQueueSize(final int count, final long bytes) { + boolean updated = false; + while (!updated) { + final FlowFileQueueSize original = size.get(); + final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount + count, original.activeQueueBytes + bytes, + original.swappedCount, original.swappedBytes, original.unacknowledgedCount, original.unacknowledgedBytes); + updated = size.compareAndSet(original, newSize); + } + } + + private void incrementSwapQueueSize(final int count, final long bytes) { + boolean updated = false; + while (!updated) { + final FlowFileQueueSize original = size.get(); + final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes, + original.swappedCount + count, original.swappedBytes + bytes, original.unacknowledgedCount, original.unacknowledgedBytes); + updated = size.compareAndSet(original, newSize); + } + } + + private void incrementUnacknowledgedQueueSize(final int count, final long bytes) { + boolean updated = false; + while (!updated) { + final FlowFileQueueSize original = size.get(); + final FlowFileQueueSize newSize = new FlowFileQueueSize(original.activeQueueCount, original.activeQueueBytes, + original.swappedCount, original.swappedBytes, original.unacknowledgedCount + count, original.unacknowledgedBytes + bytes); + updated = size.compareAndSet(original, newSize); + } + } + + + private static class FlowFileQueueSize { + private final int activeQueueCount; + private final long activeQueueBytes; + private final int swappedCount; + private final long swappedBytes; + private final int unacknowledgedCount; + private final long unacknowledgedBytes; + + public FlowFileQueueSize(final int activeQueueCount, final long activeQueueBytes, final int swappedCount, final long swappedBytes, + final int unacknowledgedCount, final long unacknowledgedBytes) { + this.activeQueueCount = activeQueueCount; + this.activeQueueBytes = activeQueueBytes; + this.swappedCount = swappedCount; + this.swappedBytes = swappedBytes; + this.unacknowledgedCount = unacknowledgedCount; + this.unacknowledgedBytes = unacknowledgedBytes; + } + + public boolean isEmpty() { + return activeQueueCount == 0 && swappedCount == 0 && unacknowledgedCount == 0; + } + + public QueueSize toQueueSize() { + return new QueueSize(activeQueueCount + swappedCount + unacknowledgedCount, activeQueueBytes + swappedBytes + unacknowledgedBytes); + } + + public QueueSize activeQueueSize() { + return new QueueSize(activeQueueCount, activeQueueBytes); + } + + public QueueSize unacknowledgedQueueSize() { + return new QueueSize(unacknowledgedCount, unacknowledgedBytes); + } + + public QueueSize swapQueueSize() { + return new QueueSize(swappedCount, swappedBytes); + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java similarity index 88% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 66f32d8769..3789ea514d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -34,6 +34,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.queue.DropFlowFileState; +import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.FlowFileRecord; @@ -44,7 +46,9 @@ import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -66,6 +70,8 @@ public class TestStandardFlowFileQueue { final ProvenanceEventRepository provRepo = Mockito.mock(ProvenanceEventRepository.class); final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class); + Mockito.when(provRepo.eventBuilder()).thenReturn(new StandardProvenanceEventRecord.Builder()); + queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000); TestFlowFile.idGenerator.set(0L); } @@ -148,16 +154,37 @@ public class TestStandardFlowFileQueue { assertTrue(swapManager.swappedOut.isEmpty()); queue.poll(exp); - } + @Test + public void testDropSwappedFlowFiles() { + for (int i = 1; i <= 210000; i++) { + queue.put(new TestFlowFile()); + } + + assertEquals(20, swapManager.swappedOut.size()); + final DropFlowFileStatus status = queue.dropFlowFiles("1", "Unit Test"); + while (status.getState() != DropFlowFileState.COMPLETE) { + final QueueSize queueSize = queue.size(); + System.out.println(queueSize); + try { + Thread.sleep(1000L); + } catch (final Exception e) { + } + } + + System.out.println(queue.size()); + assertEquals(0, queue.size().getObjectCount()); + assertEquals(0, queue.size().getByteCount()); + assertEquals(0, swapManager.swappedOut.size()); + assertEquals(20, swapManager.swapInCalledCount); + } private class TestSwapManager implements FlowFileSwapManager { private final Map> swappedOut = new HashMap<>(); int swapOutCalledCount = 0; int swapInCalledCount = 0; - @Override public void initialize(final SwapManagerInitializationContext initializationContext) { @@ -187,11 +214,6 @@ public class TestStandardFlowFileQueue { return new ArrayList(swappedOut.keySet()); } - @Override - public void dropSwappedFlowFiles(String swapLocation, final FlowFileQueue flowFileQueue, String user) { - - } - @Override public QueueSize getSwapSize(String swapLocation) throws IOException { final List flowFiles = swappedOut.get(swapLocation); @@ -252,6 +274,10 @@ public class TestStandardFlowFileQueue { public TestFlowFile(final Map attributes, final long size) { this.attributes = attributes; this.size = size; + + if (!attributes.containsKey(CoreAttributes.UUID.key())) { + attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString()); + } }