diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java index acc9e0ad43..2f917bee17 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java @@ -46,8 +46,11 @@ public class StandardDataValve implements DataValve { private final ProcessGroup processGroup; private final StateManager stateManager; - private Set groupsWithDataFlowingIn = new HashSet<>(); - private Set groupsWithDataFlowingOut = new HashSet<>(); + private final Set groupsWithDataFlowingIn = new HashSet<>(); + private final Set groupsWithDataFlowingOut = new HashSet<>(); + + private boolean leftOpenDueToDataQueued = false; + public StandardDataValve(final ProcessGroup processGroup, final StateManager stateManager) { this.processGroup = processGroup; @@ -67,30 +70,46 @@ public class StandardDataValve implements DataValve { return true; } - final String reasonForNotAllowing = getReasonFlowIntoGroupNotAllowed(destinationGroup); + final FlowInForbiddenReason reasonForNotAllowing = getReasonFlowIntoGroupNotAllowed(destinationGroup); + + // If we are forbidding data to flow into the group due to the fact that data is currently allowed to flow out of the group, + // and the valve was left open due to data being queued, let's verify that there is actually data queued up to flow out at the moment. + // If there is not, remove the group from those that are currently allowing data to flow out. This can happen in the following situation: + // - A FlowFile comes into the group + // - The FlowFile is split into two FlowFiles + // - One of the FlowFiles is routed to the Output Port, while the other is routed elsewhere + // - The Output Port is triggered. It opens the valve to allow data to flow out of the group. + // - The Output Port goes to close flow out of the group. However, the group is not empty, so the valve is not closed. + // - The other FlowFile is never routed to an output port. Instead, it is auto-terminated by some processor. + // Now, the valve has been left open. + // In this case, though, when the Output Port failed to close the valve, this.leftOpenDueToDataQueued was set to true. If that is the case, + // we can go ahead and close the valve now, if there's no more data queued. + if (reasonForNotAllowing == FlowInForbiddenReason.OPEN_FOR_OUTPUT && leftOpenDueToDataQueued && !destinationGroup.isDataQueued()) { + groupsWithDataFlowingOut.remove(destinationGroup.getIdentifier()); + } + if (reasonForNotAllowing != null) { // Since there is a reason not to allow it, return false. The reason has already been logged at a DEBUG level. return false; } - logger.debug("Opening valve to allow data to flow into {}", destinationGroup); groupsWithDataFlowingIn.add(destinationGroup.getIdentifier()); storeState(); return true; } - private String getReasonFlowIntoGroupNotAllowed(final ProcessGroup destinationGroup) { + private FlowInForbiddenReason getReasonFlowIntoGroupNotAllowed(final ProcessGroup destinationGroup) { if (destinationGroup.isDataQueued()) { // If the destination group already has data queued up, and the valve is not already open, do not allow data to // flow into the group. If we did, we would end up mixing together two different batches of data. logger.trace("Will not allow data to flow into {} because valve is not already open and the Process Group has data queued", destinationGroup); - return "Process Group already has data queued and valve is not already allowing data into group"; + return FlowInForbiddenReason.DATA_QUEUED; } if (destinationGroup.getFlowFileOutboundPolicy() == FlowFileOutboundPolicy.BATCH_OUTPUT && groupsWithDataFlowingOut.contains(destinationGroup.getIdentifier())) { logger.trace("Will not allow data to flow into {} because Outbound Policy is Batch Output and valve is already open to allow data to flow out of group", destinationGroup); - return "Data Valve is already allowing data to flow out of group"; + return FlowInForbiddenReason.OPEN_FOR_OUTPUT; } for (final Port port : destinationGroup.getInputPorts()) { @@ -109,7 +128,7 @@ public class StandardDataValve implements DataValve { if (Boolean.TRUE.equals(flowingOutOfSourceGroup)) { logger.trace("Will not allow data to flow into {} because port {} has an incoming connection from {} and that Process Group is currently allowing data to flow out", destinationGroup, port, sourceConnectable); - return "Source connected to Input Port is an Output Port with Batch Output and is currently allowing data to flow out"; + return FlowInForbiddenReason.SOURCE_FLOWING_OUT; } } } @@ -161,6 +180,10 @@ public class StandardDataValve implements DataValve { logger.debug("Opening valve to allow data to flow out of {}", sourceGroup); groupsWithDataFlowingOut.add(sourceGroup.getIdentifier()); storeState(); + + // Note that the valve has not been left open due to data being queued. This prevents an Input Port from closing the valve + // when the data is no longer queued, but while the Output Port is still processing the data. + leftOpenDueToDataQueued = false; return true; } @@ -209,6 +232,9 @@ public class StandardDataValve implements DataValve { final boolean dataQueued = sourceGroup.isDataQueued(); if (dataQueued) { logger.debug("Triggered to close flow of data out of group {} but group is not empty so will not close valve", sourceGroup); + + // Denote that the valve was left open due to data being queued. This way, we can close the valve when the data is no longer queued. + leftOpenDueToDataQueued = true; return; } @@ -231,10 +257,8 @@ public class StandardDataValve implements DataValve { final Map> reasonOutputNotAllowed = new HashMap<>(); for (final ProcessGroup group : processGroup.getProcessGroups()) { if (group.getFlowFileConcurrency() == FlowFileConcurrency.SINGLE_BATCH_PER_NODE) { - String inputReason = getReasonFlowIntoGroupNotAllowed(group); - if (inputReason == null) { - inputReason = "Input is Allowed"; - } + final FlowInForbiddenReason forbiddenReason = getReasonFlowIntoGroupNotAllowed(group); + final String inputReason = forbiddenReason == null ? "Input is Allowed" : forbiddenReason.getExplanation(); final List inputGroupsAffected = reasonInputNotAllowed.computeIfAbsent(inputReason, k -> new ArrayList<>()); inputGroupsAffected.add(group); @@ -290,7 +314,7 @@ public class StandardDataValve implements DataValve { return; } - if (!stateMap.getStateVersion().isPresent()) { + if (stateMap.getStateVersion().isEmpty()) { logger.debug("No state to recover for {}", this); return; } @@ -333,4 +357,20 @@ public class StandardDataValve implements DataValve { return "StandardDataValve[group=" + processGroup + "]"; } + public enum FlowInForbiddenReason { + DATA_QUEUED("Process Group already has data queued and valve is not already open to allow data to flow in"), + + OPEN_FOR_OUTPUT("Data Valve is already open to allow data to flow out of group"), + + SOURCE_FLOWING_OUT("Port has an incoming connection from a Process Group that is currently allowing data to flow out"); + + private final String explanation; + FlowInForbiddenReason(final String explanation) { + this.explanation = explanation; + } + + public String getExplanation() { + return explanation; + } + } }