NIFI-1830: Fixed problems in the merging logic for Drop FlowFile Requests. This closes #394

This commit is contained in:
Mark Payne 2016-04-29 16:07:37 -04:00 committed by Matt Gilman
parent ff98d823e2
commit 45ca978498
1 changed files with 18 additions and 1 deletions

View File

@ -2990,9 +2990,18 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
long droppedSize = 0; long droppedSize = 0;
DropFlowFileState state = null; DropFlowFileState state = null;
boolean allFinished = true;
String failureReason = null;
for (final Map.Entry<NodeIdentifier, DropRequestDTO> nodeEntry : dropRequestMap.entrySet()) { for (final Map.Entry<NodeIdentifier, DropRequestDTO> nodeEntry : dropRequestMap.entrySet()) {
final DropRequestDTO nodeDropRequest = nodeEntry.getValue(); final DropRequestDTO nodeDropRequest = nodeEntry.getValue();
if (!nodeDropRequest.isFinished()) {
allFinished = false;
}
if (nodeDropRequest.getFailureReason() != null) {
failureReason = nodeDropRequest.getFailureReason();
}
currentCount += nodeDropRequest.getCurrentCount(); currentCount += nodeDropRequest.getCurrentCount();
currentSize += nodeDropRequest.getCurrentSize(); currentSize += nodeDropRequest.getCurrentSize();
droppedCount += nodeDropRequest.getDroppedCount(); droppedCount += nodeDropRequest.getDroppedCount();
@ -3006,7 +3015,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
} }
final DropFlowFileState nodeState = DropFlowFileState.valueOfDescription(nodeDropRequest.getState()); final DropFlowFileState nodeState = DropFlowFileState.valueOfDescription(nodeDropRequest.getState());
if (state == null || state.compareTo(nodeState) > 0) { if (state == null || state.ordinal() > nodeState.ordinal()) {
state = nodeState; state = nodeState;
} }
} }
@ -3019,6 +3028,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
dropRequest.setDroppedSize(droppedSize); dropRequest.setDroppedSize(droppedSize);
dropRequest.setDropped(FormatUtils.formatCount(droppedCount) + " / " + FormatUtils.formatDataSize(droppedSize)); dropRequest.setDropped(FormatUtils.formatCount(droppedCount) + " / " + FormatUtils.formatDataSize(droppedSize));
dropRequest.setFinished(allFinished);
dropRequest.setFailureReason(failureReason);
if (originalCount == 0) {
dropRequest.setPercentCompleted(allFinished ? 100 : 0);
} else {
dropRequest.setPercentCompleted((int) ((double) droppedCount / (double) originalCount * 100D));
}
if (!nodeWaiting) { if (!nodeWaiting) {
dropRequest.setOriginalCount(originalCount); dropRequest.setOriginalCount(originalCount);
dropRequest.setOriginalSize(originalSize); dropRequest.setOriginalSize(originalSize);