NIFI-730: Added additional parameters to dropFlowFiles

This commit is contained in:
Mark Payne 2015-10-13 12:20:18 -04:00
parent 09d6fe5cdb
commit af78354d84
5 changed files with 48 additions and 14 deletions

View File

@ -60,6 +60,12 @@ public interface DropFlowFileStatus {
*/
QueueSize getCurrentSize();
/**
* @return a QueueSize representing the number of FlowFiles that have been dropped for this request
* and the aggregate size of those FlowFiles
*/
QueueSize getDroppedSize();
/**
* @return the current state of the operation
*/

View File

@ -181,9 +181,12 @@ public interface FlowFileQueue {
* passed to the {@link #getDropFlowFileStatus(String)} and {@link #cancelDropFlowFileStatus(String)}
* methods in order to obtain the status later or cancel a request
*
* @param requestor the entity that is requesting that the FlowFiles be dropped; this will be
* included in the Provenance Events that are generated.
*
* @return the status of the drop request.
*/
DropFlowFileStatus dropFlowFiles();
DropFlowFileStatus dropFlowFiles(String requestIdentifier, String requestor);
/**
* Returns the current status of a Drop FlowFile Request that was initiated via the

View File

@ -48,6 +48,20 @@ public class QueueSize {
return totalSizeBytes;
}
/**
* Returns a new QueueSize that is the sum of this QueueSize and the provided QueueSize
*
* @param other the other QueueSize to add to this QueueSize
* @return a new QueueSize that is the sum of this QueueSize and the provided QueueSize
*/
public QueueSize add(final QueueSize other) {
if (other == null) {
return new QueueSize(objectCount, totalSizeBytes);
}
return new QueueSize(objectCount + other.getObjectCount(), totalSizeBytes + other.getByteCount());
}
@Override
public String toString() {
return "QueueSize[FlowFiles=" + objectCount + ", ContentSize=" + NumberFormat.getNumberInstance().format(totalSizeBytes) + " Bytes]";

View File

@ -27,6 +27,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
private volatile QueueSize originalSize;
private volatile QueueSize currentSize;
private volatile QueueSize droppedSize = new QueueSize(0, 0L);
private volatile long lastUpdated = System.currentTimeMillis();
private volatile Thread executionThread;
@ -65,6 +66,15 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
this.currentSize = currentSize;
}
@Override
public QueueSize getDroppedSize() {
return droppedSize;
}
void setDroppedSize(final QueueSize droppedSize) {
this.droppedSize = droppedSize;
}
@Override
public DropFlowFileState getState() {
return state;

View File

@ -29,7 +29,6 @@ import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -900,7 +899,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>();
@Override
public DropFlowFileStatus dropFlowFiles() {
public DropFlowFileStatus dropFlowFiles(final String requestIdentifier, final String requestor) {
// purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother
if (dropRequestMap.size() > 10) {
final List<String> toDrop = new ArrayList<>();
@ -918,10 +917,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
}
// TODO: get user name!
final String userName = null;
final String requestIdentifier = UUID.randomUUID().toString();
final DropFlowFileRequest dropRequest = new DropFlowFileRequest(requestIdentifier);
final Thread t = new Thread(new Runnable() {
@Override
@ -932,20 +927,23 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
try {
final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
drop(activeQueueRecords, userName);
QueueSize droppedSize = drop(activeQueueRecords, requestor);
activeQueue.clear();
dropRequest.setCurrentSize(getQueueSize());
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
drop(swapQueue, userName);
droppedSize = drop(swapQueue, requestor);
swapQueue.clear();
dropRequest.setCurrentSize(getQueueSize());
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
final Iterator<String> swapLocationItr = swapLocations.iterator();
while (swapLocationItr.hasNext()) {
final String swapLocation = swapLocationItr.next();
final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
try {
drop(swappedIn, userName);
droppedSize = drop(swappedIn, requestor);
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
} catch (final Exception e) {
activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
throw e;
@ -974,15 +972,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
return dropRequest;
}
private void drop(final List<FlowFileRecord> flowFiles, final String user) throws IOException {
private QueueSize drop(final List<FlowFileRecord> flowFiles, final String requestor) throws IOException {
// Create a Provenance Event and a FlowFile Repository record for each FlowFile
final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(flowFiles.size());
final List<RepositoryRecord> flowFileRepoRecords = new ArrayList<>(flowFiles.size());
for (final FlowFileRecord flowFile : flowFiles) {
provenanceEvents.add(createDropEvent(flowFile, user));
provenanceEvents.add(createDropEvent(flowFile, requestor));
flowFileRepoRecords.add(createDeleteRepositoryRecord(flowFile));
}
long dropContentSize = 0L;
for (final FlowFileRecord flowFile : flowFiles) {
final ContentClaim contentClaim = flowFile.getContentClaim();
if (contentClaim == null) {
@ -995,20 +994,22 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
resourceClaimManager.decrementClaimantCount(resourceClaim);
dropContentSize += flowFile.getSize();
}
provRepository.registerEvents(provenanceEvents);
flowFileRepository.updateRepository(flowFileRepoRecords);
return new QueueSize(flowFiles.size(), dropContentSize);
}
private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile, final String user) {
private ProvenanceEventRecord createDropEvent(final FlowFileRecord flowFile, final String requestor) {
final ProvenanceEventBuilder builder = provRepository.eventBuilder();
builder.fromFlowFile(flowFile);
builder.setEventType(ProvenanceEventType.DROP);
builder.setLineageStartDate(flowFile.getLineageStartDate());
builder.setComponentId(getIdentifier());
builder.setComponentType("Connection");
builder.setDetails("FlowFile manually dropped by user " + user);
builder.setDetails("FlowFile manually dropped; request made by " + requestor);
return builder.build();
}