From 8b911c5aab2a4b8283510a3423e3c8962a533b96 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 19 Mar 2015 11:04:46 -0400 Subject: [PATCH] NIFI-443: Always start funnels when added to process group, even when autoResumeState is false --- .../org/apache/nifi/groups/ProcessGroup.java | 38 +++++----- .../nifi/controller/FlowController.java | 73 ++++++++++++------- .../nifi/controller/StandardFlowService.java | 21 ++---- .../controller/StandardFlowSynchronizer.java | 6 +- .../nifi/groups/StandardProcessGroup.java | 55 +++----------- .../ApplicationStartupContextListener.java | 6 +- 6 files changed, 90 insertions(+), 109 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index 61be59c349..53b26e1787 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -118,7 +118,7 @@ public interface ProcessGroup { void stopProcessing(); /** - * Starts the given Processor + * Enables the given Processor * * @param processor the processor to start * @throws IllegalStateException if the processor is not valid, or is @@ -127,25 +127,19 @@ public interface ProcessGroup { void enableProcessor(ProcessorNode processor); /** - * Starts the given Input Port + * Enables the given Input Port * * @param port */ void enableInputPort(Port port); /** - * Starts the given Output Port + * Enables the given Output Port * * @param port */ void enableOutputPort(Port port); - /** - * Starts the given Funnel - * - * @param funnel - */ - void enableFunnel(Funnel funnel); /** * Starts the given Processor @@ -206,7 +200,7 @@ public interface ProcessGroup { void stopFunnel(Funnel funnel); /** - * Starts the given Processor + * Disables the given Processor * * @param processor the processor to start * @throws IllegalStateException if the processor is not valid, or is @@ -215,25 +209,19 @@ public interface ProcessGroup { void disableProcessor(ProcessorNode processor); /** - * Starts the given Input Port + * Disables the given Input Port * * @param port */ void disableInputPort(Port port); /** - * Starts the given Output Port + * Disables the given Output Port * * @param port */ void disableOutputPort(Port port); - /** - * Starts the given Funnel - * - * @param funnel - */ - void disableFunnel(Funnel funnel); /** * Indicates that the Flow is being shutdown; allows cleanup of resources @@ -618,11 +606,23 @@ public interface ProcessGroup { Port getOutputPortByName(String name); /** - * Adds the given funnel to this ProcessGroup + * Adds the given funnel to this ProcessGroup and starts it. While other components + * do not automatically start, the funnel does by default because it is intended to be + * more of a notional component that users are unable to explicitly start and stop. + * However, there is an override available in {@link #addFunnel(Funnel, boolean)} because + * we may need to avoid starting the funnel on restart until the flow is completely + * initialized. * * @param funnel */ void addFunnel(Funnel funnel); + + /** + * Adds the given funnel to this ProcessGroup and optionally starts the funnel. + * @param funnel + * @param autoStart + */ + void addFunnel(Funnel funnel, boolean autoStart); /** * Returns a Set of all Funnels that belong to this ProcessGroup diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 54f0807848..06ef203644 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -576,40 +576,57 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H * flag of true to now start *

*/ - public void startDelayed() { + public void onFlowInitialized(final boolean startDelayedComponents) { writeLock.lock(); try { - LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size())); - for (final Connectable connectable : startConnectablesAfterInitialization) { - if (connectable.getScheduledState() == ScheduledState.DISABLED) { - continue; - } - - try { - if (connectable instanceof ProcessorNode) { - connectable.getProcessGroup().startProcessor((ProcessorNode) connectable); - } else { - startConnectable(connectable); + if ( startDelayedComponents ) { + LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size())); + for (final Connectable connectable : startConnectablesAfterInitialization) { + if (connectable.getScheduledState() == ScheduledState.DISABLED) { + continue; + } + + try { + if (connectable instanceof ProcessorNode) { + connectable.getProcessGroup().startProcessor((ProcessorNode) connectable); + } else { + startConnectable(connectable); + } + } catch (final Throwable t) { + LOG.error("Unable to start {} due to {}", new Object[]{connectable, t}); } - } catch (final Throwable t) { - LOG.error("Unable to start {} due to {}", new Object[]{connectable, t}); } - } - - startConnectablesAfterInitialization.clear(); - - int startedTransmitting = 0; - for (final RemoteGroupPort remoteGroupPort : startRemoteGroupPortsAfterInitialization) { - try { - remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort); - startedTransmitting++; - } catch (final Throwable t) { - LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t}); + + startConnectablesAfterInitialization.clear(); + + int startedTransmitting = 0; + for (final RemoteGroupPort remoteGroupPort : startRemoteGroupPortsAfterInitialization) { + try { + remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort); + startedTransmitting++; + } catch (final Throwable t) { + LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t}); + } } + + LOG.info("Started {} Remote Group Ports transmitting", startedTransmitting); + startRemoteGroupPortsAfterInitialization.clear(); + } else { + // We don't want to start all of the delayed components. However, funnels need to be started anyway + // because we don't provide users the ability to start or stop them - they are just notional. + for (final Connectable connectable : startConnectablesAfterInitialization) { + try { + if (connectable instanceof Funnel) { + startConnectable(connectable); + } + } catch (final Throwable t) { + LOG.error("Unable to start {} due to {}", new Object[]{connectable, t}); + } + } + + startConnectablesAfterInitialization.clear(); + startRemoteGroupPortsAfterInitialization.clear(); } - - LOG.info("Started {} Remote Group Ports transmitting", startedTransmitting); - startRemoteGroupPortsAfterInitialization.clear(); } finally { writeLock.unlock(); } diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index d459b0047c..64ce5c4142 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -423,18 +423,15 @@ public class StandardFlowService implements FlowService, ProtocolHandler { */ controller.startHeartbeating(); - // if configured, start all components - if (autoResumeState) { - try { - controller.startDelayed(); - } catch (final Exception ex) { - logger.warn("Unable to start all processors due to invalid flow configuration."); - if (logger.isDebugEnabled()) { - logger.warn(StringUtils.EMPTY, ex); - } + // notify controller that flow is initialized + try { + controller.onFlowInitialized(autoResumeState); + } catch (final Exception ex) { + logger.warn("Unable to start all processors due to invalid flow configuration."); + if (logger.isDebugEnabled()) { + logger.warn(StringUtils.EMPTY, ex); } } - } else { try { loadFromConnectionResponse(response); @@ -732,9 +729,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { controller.setPrimary(response.isPrimary()); // start the processors as indicated by the dataflow - if (dataFlow.isAutoStartProcessors()) { - controller.startDelayed(); - } + controller.onFlowInitialized(dataFlow.isAutoStartProcessors()); loadTemplates(dataFlow.getTemplates()); loadSnippets(dataFlow.getSnippets()); diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index b60d18711f..05a8f019d8 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -692,7 +692,11 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final FunnelDTO funnelDTO = FlowFromDOMFactory.getFunnel(funnelElement); final Funnel funnel = controller.createFunnel(funnelDTO.getId()); funnel.setPosition(toPosition(funnelDTO.getPosition())); - processGroup.addFunnel(funnel); + + // Since this is called during startup, we want to add the funnel without enabling it + // and then tell the controller to enable it. This way, if the controller is not fully + // initialized, the starting of the funnel is delayed until the controller is ready. + processGroup.addFunnel(funnel, false); controller.startConnectable(funnel); } diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 856ccc1273..0025caa550 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -1036,9 +1036,7 @@ public final class StandardProcessGroup implements ProcessGroup { } final ScheduledState state = funnel.getScheduledState(); - if (state == ScheduledState.DISABLED) { - throw new IllegalStateException("Funnel is disabled"); - } else if (state == ScheduledState.RUNNING) { + if (state == ScheduledState.RUNNING) { return; } scheduler.startFunnel(funnel); @@ -1131,27 +1129,6 @@ public final class StandardProcessGroup implements ProcessGroup { } } - @Override - public void enableFunnel(final Funnel funnel) { - readLock.lock(); - try { - if (!funnels.containsKey(funnel.getIdentifier())) { - throw new IllegalStateException("No Funnel with ID " + funnel.getIdentifier() + " belongs to this Process Group"); - } - - final ScheduledState state = funnel.getScheduledState(); - if (state == ScheduledState.STOPPED) { - return; - } else if (state == ScheduledState.RUNNING) { - throw new IllegalStateException("Funnel is currently running"); - } - - scheduler.enableFunnel(funnel); - } finally { - readLock.unlock(); - } - } - @Override public void enableInputPort(final Port port) { readLock.lock(); @@ -1215,26 +1192,6 @@ public final class StandardProcessGroup implements ProcessGroup { } } - @Override - public void disableFunnel(final Funnel funnel) { - readLock.lock(); - try { - if (!funnels.containsKey(funnel.getIdentifier())) { - throw new IllegalStateException("No Funnel with ID " + funnel.getIdentifier() + " belongs to this Process Group"); - } - - final ScheduledState state = funnel.getScheduledState(); - if (state == ScheduledState.DISABLED) { - return; - } else if (state == ScheduledState.RUNNING) { - throw new IllegalStateException("Funnel is currently running"); - } - - scheduler.disableFunnel(funnel); - } finally { - readLock.unlock(); - } - } @Override public void disableInputPort(final Port port) { @@ -1546,8 +1503,14 @@ public final class StandardProcessGroup implements ProcessGroup { return null; } + @Override public void addFunnel(final Funnel funnel) { + addFunnel(funnel, true); + } + + @Override + public void addFunnel(final Funnel funnel, final boolean autoStart) { writeLock.lock(); try { final Funnel existing = funnels.get(requireNonNull(funnel).getIdentifier()); @@ -1557,6 +1520,10 @@ public final class StandardProcessGroup implements ProcessGroup { funnel.setProcessGroup(this); funnels.put(funnel.getIdentifier(), funnel); + + if ( autoStart ) { + startFunnel(funnel); + } } finally { writeLock.unlock(); } diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/contextlistener/ApplicationStartupContextListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/contextlistener/ApplicationStartupContextListener.java index a6316701e7..8b48abf323 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/contextlistener/ApplicationStartupContextListener.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/contextlistener/ApplicationStartupContextListener.java @@ -91,10 +91,8 @@ public class ApplicationStartupContextListener implements ServletContextListener * reloading actions, the node will start the necessary * processors. */ - if (properties.getAutoResumeState()) { - final FlowController flowController = flowService.getController(); - flowController.startDelayed(); - } + final FlowController flowController = flowService.getController(); + flowController.onFlowInitialized(properties.getAutoResumeState()); logger.info("Flow Controller started successfully."); }