diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java index 3c3be9b6d5..7d5b9c2935 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/DropFlowFileStatus.java @@ -60,6 +60,12 @@ public interface DropFlowFileStatus { */ QueueSize getCurrentSize(); + /** + * @return a QueueSize representing the number of FlowFiles that have been dropped for this request + * and the aggregate size of those FlowFiles + */ + QueueSize getDroppedSize(); + /** * @return the current state of the operation */ diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java index bc2f3585eb..2d67d58b0c 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -181,9 +181,12 @@ public interface FlowFileQueue { * passed to the {@link #getDropFlowFileStatus(String)} and {@link #cancelDropFlowFileStatus(String)} * methods in order to obtain the status later or cancel a request * + * @param requestor the entity that is requesting that the FlowFiles be dropped; this will be + * included in the Provenance Events that are generated. + * * @return the status of the drop request. */ - DropFlowFileStatus dropFlowFiles(); + DropFlowFileStatus dropFlowFiles(String requestIdentifier, String requestor); /** * Returns the current status of a Drop FlowFile Request that was initiated via the diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java index 528d652278..7998d37537 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java @@ -48,6 +48,20 @@ public class QueueSize { return totalSizeBytes; } + /** + * Returns a new QueueSize that is the sum of this QueueSize and the provided QueueSize + * + * @param other the other QueueSize to add to this QueueSize + * @return a new QueueSize that is the sum of this QueueSize and the provided QueueSize + */ + public QueueSize add(final QueueSize other) { + if (other == null) { + return new QueueSize(objectCount, totalSizeBytes); + } + + return new QueueSize(objectCount + other.getObjectCount(), totalSizeBytes + other.getByteCount()); + } + @Override public String toString() { return "QueueSize[FlowFiles=" + objectCount + ", ContentSize=" + NumberFormat.getNumberInstance().format(totalSizeBytes) + " Bytes]"; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java index 609fe75bb6..410430874a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/DropFlowFileRequest.java @@ -27,6 +27,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus { private volatile QueueSize originalSize; private volatile QueueSize currentSize; + private volatile QueueSize droppedSize = new QueueSize(0, 0L); private volatile long lastUpdated = System.currentTimeMillis(); private volatile Thread executionThread; @@ -65,6 +66,15 @@ public class DropFlowFileRequest implements DropFlowFileStatus { this.currentSize = currentSize; } + @Override + public QueueSize getDroppedSize() { + return droppedSize; + } + + void setDroppedSize(final QueueSize droppedSize) { + this.droppedSize = droppedSize; + } + @Override public DropFlowFileState getState() { return state; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 073e5fb9d9..b699ceb266 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -900,7 +899,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final ConcurrentMap dropRequestMap = new ConcurrentHashMap<>(); @Override - public DropFlowFileStatus dropFlowFiles() { + public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) { // purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother if (dropRequestMap.size() > 10) { final List toDrop = new ArrayList<>(); @@ -918,10 +917,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } } - // TODO: get user name! - final String userName = null; - - final String requestIdentifier = UUID.randomUUID().toString(); final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestIdentifier); final Thread t = new Thread(new Runnable() { @Override @@ -932,20 +927,23 @@ public final class StandardFlowFileQueue implements FlowFileQueue { try { final List activeQueueRecords = new ArrayList<>(activeQueue); - drop(activeQueueRecords, userName); + QueueSize droppedSize = drop(activeQueueRecords, requestor); activeQueue.clear(); dropRequest.setCurrentSize(getQueueSize()); + dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); - drop(swapQueue, userName); + droppedSize = drop(swapQueue, requestor); swapQueue.clear(); dropRequest.setCurrentSize(getQueueSize()); + dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); final Iterator swapLocationItr = swapLocations.iterator(); while (swapLocationItr.hasNext()) { final String swapLocation = swapLocationItr.next(); final List swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this); try { - drop(swappedIn, userName); + droppedSize = drop(swappedIn, requestor); + dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); } catch (final Exception e) { activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue. throw e; @@ -974,15 +972,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue { return dropRequest; } - private void drop(final List flowFiles, final String user) throws IOException { + private QueueSize drop(final List flowFiles, final String requestor) throws IOException { // Create a Provenance Event and a FlowFile Repository record for each FlowFile final List provenanceEvents = new ArrayList<>(flowFiles.size()); final List flowFileRepoRecords = new ArrayList<>(flowFiles.size()); for (final FlowFileRecord flowFile : flowFiles) { - provenanceEvents.add(createDropEvent(flowFile, user)); + provenanceEvents.add(createDropEvent(flowFile, requestor)); flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile)); } + long dropContentSize = 0L; for (final FlowFileRecord flowFile : flowFiles) { final ContentClaim contentClaim = flowFile.getContentClaim(); if (contentClaim == null) { @@ -995,20 +994,22 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } resourceClaimManager.decrementClaimantCount(resourceClaim); + dropContentSize += flowFile.getSize(); } provRepository.registerEvents(provenanceEvents); flowFileRepository.updateRepository(flowFileRepoRecords); + return new QueueSize(flowFiles.size(), dropContentSize); } - private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile, final String user) { + private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile, final String requestor) { final ProvenanceEventBuilder builder = provRepository.eventBuilder(); builder.fromFlowFile(flowFile); builder.setEventType(ProvenanceEventType.DROP); builder.setLineageStartDate(flowFile.getLineageStartDate()); builder.setComponentId(getIdentifier()); builder.setComponentType("Connection"); - builder.setDetails("FlowFile manually dropped by user " + user); + builder.setDetails("FlowFile manually dropped; request made by " + requestor); return builder.build(); }