NIFI-13340 Fixed a bug in which an Output Port can leave a Process Group's DataValve open for output, but then the last FlowFile is terminated instead of going to an Output Port, ultimately resulting in the DataValve remaining open indefinitely. Now, this will be detected and the valve will be closed.

This closes #8951

Signed-off-by: David Handermann <>
This commit is contained in:
Mark Payne 2024-06-10 16:57:08 -04:00 committed by exceptionfactory
parent cbe52a3763
commit 039cd2f18a
No known key found for this signature in database

View File

@ -46,8 +46,11 @@ public class StandardDataValve implements DataValve {
private final ProcessGroup processGroup;
private final StateManager stateManager;
private Set<String> groupsWithDataFlowingIn = new HashSet<>();
private Set<String> groupsWithDataFlowingOut = new HashSet<>();
private final Set<String> groupsWithDataFlowingIn = new HashSet<>();
private final Set<String> groupsWithDataFlowingOut = new HashSet<>();
private boolean leftOpenDueToDataQueued = false;
public StandardDataValve(final ProcessGroup processGroup, final StateManager stateManager) {
this.processGroup = processGroup;
@ -67,30 +70,46 @@ public class StandardDataValve implements DataValve {
return true;
final String reasonForNotAllowing = getReasonFlowIntoGroupNotAllowed(destinationGroup);
final FlowInForbiddenReason reasonForNotAllowing = getReasonFlowIntoGroupNotAllowed(destinationGroup);
// If we are forbidding data to flow into the group due to the fact that data is currently allowed to flow out of the group,
// and the valve was left open due to data being queued, let's verify that there is actually data queued up to flow out at the moment.
// If there is not, remove the group from those that are currently allowing data to flow out. This can happen in the following situation:
// - A FlowFile comes into the group
// - The FlowFile is split into two FlowFiles
// - One of the FlowFiles is routed to the Output Port, while the other is routed elsewhere
// - The Output Port is triggered. It opens the valve to allow data to flow out of the group.
// - The Output Port goes to close flow out of the group. However, the group is not empty, so the valve is not closed.
// - The other FlowFile is never routed to an output port. Instead, it is auto-terminated by some processor.
// Now, the valve has been left open.
// In this case, though, when the Output Port failed to close the valve, this.leftOpenDueToDataQueued was set to true. If that is the case,
// we can go ahead and close the valve now, if there's no more data queued.
if (reasonForNotAllowing == FlowInForbiddenReason.OPEN_FOR_OUTPUT && leftOpenDueToDataQueued && !destinationGroup.isDataQueued()) {
if (reasonForNotAllowing != null) {
// Since there is a reason not to allow it, return false. The reason has already been logged at a DEBUG level.
return false;
logger.debug("Opening valve to allow data to flow into {}", destinationGroup);
return true;
private String getReasonFlowIntoGroupNotAllowed(final ProcessGroup destinationGroup) {
private FlowInForbiddenReason getReasonFlowIntoGroupNotAllowed(final ProcessGroup destinationGroup) {
if (destinationGroup.isDataQueued()) {
// If the destination group already has data queued up, and the valve is not already open, do not allow data to
// flow into the group. If we did, we would end up mixing together two different batches of data.
logger.trace("Will not allow data to flow into {} because valve is not already open and the Process Group has data queued", destinationGroup);
return "Process Group already has data queued and valve is not already allowing data into group";
return FlowInForbiddenReason.DATA_QUEUED;
if (destinationGroup.getFlowFileOutboundPolicy() == FlowFileOutboundPolicy.BATCH_OUTPUT && groupsWithDataFlowingOut.contains(destinationGroup.getIdentifier())) {
logger.trace("Will not allow data to flow into {} because Outbound Policy is Batch Output and valve is already open to allow data to flow out of group", destinationGroup);
return "Data Valve is already allowing data to flow out of group";
return FlowInForbiddenReason.OPEN_FOR_OUTPUT;
for (final Port port : destinationGroup.getInputPorts()) {
@ -109,7 +128,7 @@ public class StandardDataValve implements DataValve {
if (Boolean.TRUE.equals(flowingOutOfSourceGroup)) {
logger.trace("Will not allow data to flow into {} because port {} has an incoming connection from {} and that Process Group is currently allowing data to flow out",
destinationGroup, port, sourceConnectable);
return "Source connected to Input Port is an Output Port with Batch Output and is currently allowing data to flow out";
return FlowInForbiddenReason.SOURCE_FLOWING_OUT;
@ -161,6 +180,10 @@ public class StandardDataValve implements DataValve {
logger.debug("Opening valve to allow data to flow out of {}", sourceGroup);
// Note that the valve has not been left open due to data being queued. This prevents an Input Port from closing the valve
// when the data is no longer queued, but while the Output Port is still processing the data.
leftOpenDueToDataQueued = false;
return true;
@ -209,6 +232,9 @@ public class StandardDataValve implements DataValve {
final boolean dataQueued = sourceGroup.isDataQueued();
if (dataQueued) {
logger.debug("Triggered to close flow of data out of group {} but group is not empty so will not close valve", sourceGroup);
// Denote that the valve was left open due to data being queued. This way, we can close the valve when the data is no longer queued.
leftOpenDueToDataQueued = true;
@ -231,10 +257,8 @@ public class StandardDataValve implements DataValve {
final Map<String, List<ProcessGroup>> reasonOutputNotAllowed = new HashMap<>();
for (final ProcessGroup group : processGroup.getProcessGroups()) {
if (group.getFlowFileConcurrency() == FlowFileConcurrency.SINGLE_BATCH_PER_NODE) {
String inputReason = getReasonFlowIntoGroupNotAllowed(group);
if (inputReason == null) {
inputReason = "Input is Allowed";
final FlowInForbiddenReason forbiddenReason = getReasonFlowIntoGroupNotAllowed(group);
final String inputReason = forbiddenReason == null ? "Input is Allowed" : forbiddenReason.getExplanation();
final List<ProcessGroup> inputGroupsAffected = reasonInputNotAllowed.computeIfAbsent(inputReason, k -> new ArrayList<>());
@ -290,7 +314,7 @@ public class StandardDataValve implements DataValve {
if (!stateMap.getStateVersion().isPresent()) {
if (stateMap.getStateVersion().isEmpty()) {
logger.debug("No state to recover for {}", this);
@ -333,4 +357,20 @@ public class StandardDataValve implements DataValve {
return "StandardDataValve[group=" + processGroup + "]";
public enum FlowInForbiddenReason {
DATA_QUEUED("Process Group already has data queued and valve is not already open to allow data to flow in"),
OPEN_FOR_OUTPUT("Data Valve is already open to allow data to flow out of group"),
SOURCE_FLOWING_OUT("Port has an incoming connection from a Process Group that is currently allowing data to flow out");
private final String explanation;
FlowInForbiddenReason(final String explanation) {
this.explanation = explanation;
public String getExplanation() {
return explanation;