mirror of https://github.com/apache/nifi.git
NIFI-443: If a funnel is given an incoming connection but has no outgoing connection, it continually runs and fails, logging a lot of ERROR messages
This commit is contained in:
parent
3a27c378cc
commit
42a4f90242
|
@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||||
import org.apache.nifi.connectable.Connectable;
|
import org.apache.nifi.connectable.Connectable;
|
||||||
|
import org.apache.nifi.connectable.ConnectableType;
|
||||||
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
|
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
|
||||||
import org.apache.nifi.controller.scheduling.ProcessContextFactory;
|
import org.apache.nifi.controller.scheduling.ProcessContextFactory;
|
||||||
import org.apache.nifi.controller.scheduling.ScheduleState;
|
import org.apache.nifi.controller.scheduling.ScheduleState;
|
||||||
|
@ -69,7 +70,9 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> {
|
||||||
final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty();
|
final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty();
|
||||||
boolean flowFilesQueued = true;
|
boolean flowFilesQueued = true;
|
||||||
final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis())
|
final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis())
|
||||||
&& (triggerWhenEmpty || (flowFilesQueued = Connectables.flowFilesQueued(connectable))) && (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable));
|
&& (triggerWhenEmpty || (flowFilesQueued = Connectables.flowFilesQueued(connectable)))
|
||||||
|
&& (connectable.getConnectableType() != ConnectableType.FUNNEL || !connectable.getConnections().isEmpty())
|
||||||
|
&& (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable));
|
||||||
|
|
||||||
if (shouldRun) {
|
if (shouldRun) {
|
||||||
scheduleState.incrementActiveThreadCount();
|
scheduleState.incrementActiveThreadCount();
|
||||||
|
|
Loading…
Reference in New Issue