NIFI-730: Added error messages if we fail to drop FlowFiles from queue

This commit is contained in:
Mark Payne 2015-10-13 15:57:18 -04:00
parent 72ff2a25d5
commit afb76afcd0
3 changed files with 53 additions and 8 deletions

View File

@ -70,4 +70,9 @@ public interface DropFlowFileStatus {
* @return the current state of the operation
*/
DropFlowFileState getState();
/**
* @return the reason that the state is set to a Failure state, or <code>null</code> if the state is not {@link DropFlowFileState#FAILURE}.
*/
String getFailureReason();
}

View File

@ -30,6 +30,7 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
private volatile QueueSize droppedSize = new QueueSize(0, 0L);
private volatile long lastUpdated = System.currentTimeMillis();
private volatile Thread executionThread;
private volatile String failureReason;
private DropFlowFileState state = DropFlowFileState.WAITING_FOR_LOCK;
@ -85,8 +86,18 @@ public class DropFlowFileRequest implements DropFlowFileStatus {
return lastUpdated;
}
@Override
public String getFailureReason() {
return failureReason;
}
synchronized void setState(final DropFlowFileState state) {
setState(state, null);
}
synchronized void setState(final DropFlowFileState state, final String explanation) {
this.state = state;
this.failureReason = explanation;
this.lastUpdated = System.currentTimeMillis();
}

View File

@ -928,14 +928,34 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
try {
final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue);
QueueSize droppedSize = drop(activeQueueRecords, requestor);
QueueSize droppedSize;
try {
droppedSize = drop(activeQueueRecords, requestor);
} catch (final IOException ioe) {
logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
logger.error("", ioe);
dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
return;
}
activeQueue.clear();
activeQueueContentSize = 0;
activeQueueSizeRef.set(0);
dropRequest.setCurrentSize(getQueueSize());
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
droppedSize = drop(swapQueue, requestor);
try {
droppedSize = drop(swapQueue, requestor);
} catch (final IOException ioe) {
logger.error("Failed to drop the FlowFiles from queue {} due to {}", StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
logger.error("", ioe);
dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString());
return;
}
swapQueue.clear();
dropRequest.setCurrentSize(getQueueSize());
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
@ -946,12 +966,22 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
final Iterator<String> swapLocationItr = swapLocations.iterator();
while (swapLocationItr.hasNext()) {
final String swapLocation = swapLocationItr.next();
final List<FlowFileRecord> swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
List<FlowFileRecord> swappedIn = null;
try {
swappedIn = swapManager.swapIn(swapLocation, StandardFlowFileQueue.this);
droppedSize = drop(swappedIn, requestor);
} catch (final Exception e) {
activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
throw e;
} catch (final IOException ioe) {
logger.error("Failed to swap in FlowFiles from Swap File {} in order to drop the FlowFiles for Connection {} due to {}",
swapLocation, StandardFlowFileQueue.this.getIdentifier(), ioe.toString());
logger.error("", ioe);
dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles from Swap File " + swapLocation + " due to " + ioe.toString());
if (swappedIn != null) {
activeQueue.addAll(swappedIn); // ensure that we don't lose the FlowFiles from our queue.
}
return;
}
dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize));
@ -963,8 +993,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
dropRequest.setState(DropFlowFileState.COMPLETE);
} catch (final Exception e) {
// TODO: Handle adequately
dropRequest.setState(DropFlowFileState.FAILURE);
dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.toString());
}
} finally {
writeLock.unlock("Drop FlowFiles");