NIFI-730: Return DropFlowFileStatus object when calling cancel

This commit is contained in:
Mark Payne 2015-10-14 09:46:21 -04:00
parent 77f7d7524c
commit 0af1acaafa
3 changed files with 22 additions and 14 deletions

View File

@ -199,11 +199,13 @@ public interface FlowFileQueue {
DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier);
/**
* Cancels the request to drop FlowFiles that has the given identifier
* Cancels the request to drop FlowFiles that has the given identifier. After this method is called, the request
* will no longer be known by this queue, so subsequent calls to {@link #getDropFlowFileStatus(String)} or
* {@link #cancelDropFlowFileRequest(String)} will return <code>null</code>
*
* @param requestIdentifier the identifier of the Drop FlowFile Request
* @return <code>true</code> if the request was canceled, <code>false</code> if the request has
* already completed or is not known
* @return the status for the request with the given identifier after it has been canceled, or <code>null</code> if no
* request status exists with that identifier
*/
boolean cancelDropFlowFileRequest(String requestIdentifier);
DropFlowFileStatus cancelDropFlowFileRequest(String requestIdentifier);
}

View File

@ -1078,7 +1078,16 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
builder.setLineageStartDate(flowFile.getLineageStartDate());
builder.setComponentId(getIdentifier());
builder.setComponentType("Connection");
builder.setDetails("FlowFile manually dropped; request made by " + requestor);
builder.setAttributes(flowFile.getAttributes(), Collections.<String, String> emptyMap());
builder.setDetails("Manually dropped by " + requestor);
builder.setSourceQueueIdentifier(getIdentifier());
final ContentClaim contentClaim = flowFile.getContentClaim();
if (contentClaim != null) {
final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), flowFile.getSize());
}
return builder.build();
}
@ -1138,14 +1147,14 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override
public boolean cancelDropFlowFileRequest(final String requestIdentifier) {
public DropFlowFileRequest cancelDropFlowFileRequest(final String requestIdentifier) {
final DropFlowFileRequest request = dropRequestMap.remove(requestIdentifier);
if (request == null) {
return false;
return null;
}
final boolean successful = request.cancel();
return successful;
request.cancel();
return request;
}
@Override

View File

@ -156,7 +156,7 @@ public class TestStandardFlowFileQueue {
queue.poll(exp);
}
@Test
@Test(timeout = 20000)
public void testDropSwappedFlowFiles() {
for (int i = 1; i <= 210000; i++) {
queue.put(new TestFlowFile());
@ -165,15 +165,12 @@ public class TestStandardFlowFileQueue {
assertEquals(20, swapManager.swappedOut.size());
final DropFlowFileStatus status = queue.dropFlowFiles("1", "Unit Test");
while (status.getState() != DropFlowFileState.COMPLETE) {
final QueueSize queueSize = queue.size();
System.out.println(queueSize);
try {
Thread.sleep(1000L);
Thread.sleep(100L);
} catch (final Exception e) {
}
}
System.out.println(queue.size());
assertEquals(0, queue.size().getObjectCount());
assertEquals(0, queue.size().getByteCount());
assertEquals(0, swapManager.swappedOut.size());