NIFI-12228: This closes #7881. Fixed issue with FlowFile Concucrrency that can occasionally bring in more data than it should.

Code cleanup, fixing logback to avoid INFO-level stack trace from xodus

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2023-10-13 11:03:11 -04:00 committed by Joseph Witt
parent 96eb1d825a
commit 0eabbcdf19
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
7 changed files with 49 additions and 20 deletions

View File

@ -186,15 +186,9 @@ public class LocalPort extends AbstractPort {
final FlowFileConcurrency flowFileConcurrency = getProcessGroup().getFlowFileConcurrency();
switch (flowFileConcurrency) {
case UNBOUNDED:
transferUnboundedConcurrency(context, session);
break;
case SINGLE_FLOWFILE_PER_NODE:
transferSingleFlowFile(session);
break;
case SINGLE_BATCH_PER_NODE:
transferInputBatch(session);
break;
case UNBOUNDED -> transferUnboundedConcurrency(context, session);
case SINGLE_FLOWFILE_PER_NODE -> transferSingleFlowFile(session);
case SINGLE_BATCH_PER_NODE -> transferInputBatch(session);
}
} finally {
flowFileGate.releaseClaim(this);

View File

@ -36,6 +36,16 @@ public class SingleConcurrencyFlowFileGate implements FlowFileGate {
return false;
}
// We need to try to open flow into the Port's group. To do this, we need to get the data valve for the parent group,
// as it is responsible for data flowing into and out of its children.
final ProcessGroup dataValveGroup = port.getProcessGroup().getParent();
final DataValve dataValve = dataValveGroup.getDataValve();
final boolean openFlowIntoGroup = dataValve.tryOpenFlowIntoGroup(port.getProcessGroup());
if (!openFlowIntoGroup) {
claimed.set(false);
return false;
}
// The claim is now held by this thread. Check if the ProcessGroup is empty.
final boolean empty = !port.getProcessGroup().isDataQueued();
if (empty) {
@ -43,6 +53,9 @@ public class SingleConcurrencyFlowFileGate implements FlowFileGate {
return true;
}
// We have already opened flow into group, so now we must close it, since we are not allowing flow in
dataValve.closeFlowIntoGroup(port.getProcessGroup());
// Process Group was not empty, so we cannot allow any more FlowFiles through. Reset claimed to false and return false,
// indicating that the caller did not obtain the claim.
claimed.set(false);
@ -52,5 +65,9 @@ public class SingleConcurrencyFlowFileGate implements FlowFileGate {
@Override
public void releaseClaim(final Port port) {
claimed.set(false);
final ProcessGroup dataValveGroup = port.getProcessGroup().getParent();
final DataValve dataValve = dataValveGroup.getDataValve();
dataValve.closeFlowIntoGroup(port.getProcessGroup());
}
}

View File

@ -84,10 +84,15 @@ public class StandardDataValve implements DataValve {
if (destinationGroup.isDataQueued()) {
// If the destination group already has data queued up, and the valve is not already open, do not allow data to
// flow into the group. If we did, we would end up mixing together two different batches of data.
logger.debug("Will not allow data to flow into {} because valve is not already open and the Process Group has data queued", destinationGroup);
logger.trace("Will not allow data to flow into {} because valve is not already open and the Process Group has data queued", destinationGroup);
return "Process Group already has data queued and valve is not already allowing data into group";
}
if (destinationGroup.getFlowFileOutboundPolicy() == FlowFileOutboundPolicy.BATCH_OUTPUT && groupsWithDataFlowingOut.contains(destinationGroup.getIdentifier())) {
logger.trace("Will not allow data to flow into {} because Outbound Policy is Batch Output and valve is already open to allow data to flow out of group", destinationGroup);
return "Data Valve is already allowing data to flow out of group";
}
for (final Port port : destinationGroup.getInputPorts()) {
for (final Connection connection : port.getIncomingConnections()) {
final Connectable sourceConnectable = connection.getSource();
@ -102,7 +107,7 @@ public class StandardDataValve implements DataValve {
final boolean flowingOutOfSourceGroup = groupsWithDataFlowingOut.contains(sourceGroup.getIdentifier());
if (Boolean.TRUE.equals(flowingOutOfSourceGroup)) {
logger.debug("Will not allow data to flow into {} because port {} has an incoming connection from {} and that Process Group is currently allowing data to flow out",
logger.trace("Will not allow data to flow into {} because port {} has an incoming connection from {} and that Process Group is currently allowing data to flow out",
destinationGroup, port, sourceConnectable);
return "Source connected to Input Port is an Output Port with Batch Output and is currently allowing data to flow out";
}
@ -119,13 +124,15 @@ public class StandardDataValve implements DataValve {
return;
}
for (final Port port : destinationGroup.getInputPorts()) {
for (final Connection connection : port.getIncomingConnections()) {
if (!connection.getFlowFileQueue().isEmpty()) {
logger.debug("Triggered to close flow of data into group {} but Input Port has incoming Connection {}, which is not empty, so will not close valve",
destinationGroup, connection);
if (destinationGroup.getFlowFileConcurrency() == FlowFileConcurrency.SINGLE_BATCH_PER_NODE) {
for (final Port port : destinationGroup.getInputPorts()) {
for (final Connection connection : port.getIncomingConnections()) {
if (!connection.getFlowFileQueue().isEmpty()) {
logger.debug("Triggered to close flow of data into group {} but Input Port has incoming Connection {}, which is not empty, so will not close valve",
destinationGroup, connection);
return;
return;
}
}
}
}
@ -175,14 +182,14 @@ public class StandardDataValve implements DataValve {
}
if (!connection.getFlowFileQueue().isEmpty()) {
logger.debug("Not allowing data to flow out of {} because {} has a destination of {}, which has data queued and its Process Group is "
logger.trace("Not allowing data to flow out of {} because {} has a destination of {}, which has data queued and its Process Group is "
+ "configured with a FlowFileConcurrency of Batch Per Node.", sourceGroup, port, connection);
return "Output Connection already has data queued";
}
final boolean dataFlowingIntoDestination = groupsWithDataFlowingIn.contains(destinationProcessGroup.getIdentifier());
if (dataFlowingIntoDestination) {
logger.debug("Not allowing data to flow out of {} because {} has a destination of {}, and its Process Group is "
logger.trace("Not allowing data to flow out of {} because {} has a destination of {}, and its Process Group is "
+ "currently allowing data to flow in", sourceGroup, port, connection);
return "Destination Process Group is allowing data to flow in";
}

View File

@ -148,6 +148,9 @@
<!-- Suppress non-error messages from Apache Atlas which was emitting large amounts of INFO logs by default -->
<logger name="org.apache.atlas" level="WARN"/>
<!-- Suppress non-error messages from JetBrains Xodus FileDataWriter related to FileChannel -->
<logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" />
<!-- These log messages would normally go to the USER_FILE log, but they belong in the APP_FILE -->
<logger name="org.apache.nifi.web.security.requests" level="INFO" additivity="false">
<appender-ref ref="APP_FILE"/>

View File

@ -148,6 +148,9 @@
<!-- Suppress non-error messages from Apache Atlas which was emitting large amounts of INFO logs by default -->
<logger name="org.apache.atlas" level="WARN"/>
<!-- Suppress non-error messages from JetBrains Xodus FileDataWriter related to FileChannel -->
<logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" />
<!-- These log messages would normally go to the USER_FILE log, but they belong in the APP_FILE -->
<logger name="org.apache.nifi.web.security.requests" level="INFO" additivity="false">
<appender-ref ref="APP_FILE"/>

View File

@ -102,7 +102,6 @@
<logger name="org.apache.nifi.processors.standard.LogMessage" level="INFO"/>
<logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" />
<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" />
<logger name="org.apache.zookeeper.server.NIOServerCnxn" level="ERROR" />
<logger name="org.apache.zookeeper.server.NIOServerCnxnFactory" level="ERROR" />
@ -149,6 +148,9 @@
<!-- Suppress non-error messages from Apache Atlas which was emitting large amounts of INFO logs by default -->
<logger name="org.apache.atlas" level="WARN"/>
<!-- Suppress non-error messages from JetBrains Xodus FileDataWriter related to FileChannel -->
<logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" />
<!-- These log messages would normally go to the USER_FILE log, but they belong in the APP_FILE -->
<logger name="org.apache.nifi.web.security.requests" level="INFO" additivity="false">
<appender-ref ref="APP_FILE"/>

View File

@ -149,6 +149,9 @@
<!-- Suppress non-error messages from Apache Atlas which was emitting large amounts of INFO logs by default -->
<logger name="org.apache.atlas" level="WARN"/>
<!-- Suppress non-error messages from JetBrains Xodus FileDataWriter related to FileChannel -->
<logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" />
<!-- These log messages would normally go to the USER_FILE log, but they belong in the APP_FILE -->
<logger name="org.apache.nifi.web.security.requests" level="INFO" additivity="false">
<appender-ref ref="APP_FILE"/>