NIFI-443: Always start funnels when added to process group, even when autoResumeState is false

This commit is contained in:
Mark Payne 2015-03-19 11:04:46 -04:00
parent eb5ec703ba
commit 8b911c5aab
6 changed files with 90 additions and 109 deletions

View File

@ -118,7 +118,7 @@ public interface ProcessGroup {
void stopProcessing();
/**
* Starts the given Processor
* Enables the given Processor
*
* @param processor the processor to start
* @throws IllegalStateException if the processor is not valid, or is
@ -127,25 +127,19 @@ public interface ProcessGroup {
void enableProcessor(ProcessorNode processor);
/**
* Starts the given Input Port
* Enables the given Input Port
*
* @param port
*/
void enableInputPort(Port port);
/**
* Starts the given Output Port
* Enables the given Output Port
*
* @param port
*/
void enableOutputPort(Port port);
/**
* Starts the given Funnel
*
* @param funnel
*/
void enableFunnel(Funnel funnel);
/**
* Starts the given Processor
@ -206,7 +200,7 @@ public interface ProcessGroup {
void stopFunnel(Funnel funnel);
/**
* Starts the given Processor
* Disables the given Processor
*
* @param processor the processor to start
* @throws IllegalStateException if the processor is not valid, or is
@ -215,25 +209,19 @@ public interface ProcessGroup {
void disableProcessor(ProcessorNode processor);
/**
* Starts the given Input Port
* Disables the given Input Port
*
* @param port
*/
void disableInputPort(Port port);
/**
* Starts the given Output Port
* Disables the given Output Port
*
* @param port
*/
void disableOutputPort(Port port);
/**
* Starts the given Funnel
*
* @param funnel
*/
void disableFunnel(Funnel funnel);
/**
* Indicates that the Flow is being shutdown; allows cleanup of resources
@ -618,11 +606,23 @@ public interface ProcessGroup {
Port getOutputPortByName(String name);
/**
* Adds the given funnel to this ProcessGroup
* Adds the given funnel to this ProcessGroup and starts it. While other components
* do not automatically start, the funnel does by default because it is intended to be
* more of a notional component that users are unable to explicitly start and stop.
* However, there is an override available in {@link #addFunnel(Funnel, boolean)} because
* we may need to avoid starting the funnel on restart until the flow is completely
* initialized.
*
* @param funnel
*/
void addFunnel(Funnel funnel);
/**
* Adds the given funnel to this ProcessGroup and optionally starts the funnel.
* @param funnel
* @param autoStart
*/
void addFunnel(Funnel funnel, boolean autoStart);
/**
* Returns a Set of all Funnels that belong to this ProcessGroup

View File

@ -576,40 +576,57 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
* flag of true to now start
* </p>
*/
public void startDelayed() {
public void onFlowInitialized(final boolean startDelayedComponents) {
writeLock.lock();
try {
LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size()));
for (final Connectable connectable : startConnectablesAfterInitialization) {
if (connectable.getScheduledState() == ScheduledState.DISABLED) {
continue;
}
try {
if (connectable instanceof ProcessorNode) {
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
} else {
startConnectable(connectable);
if ( startDelayedComponents ) {
LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size()));
for (final Connectable connectable : startConnectablesAfterInitialization) {
if (connectable.getScheduledState() == ScheduledState.DISABLED) {
continue;
}
try {
if (connectable instanceof ProcessorNode) {
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable);
} else {
startConnectable(connectable);
}
} catch (final Throwable t) {
LOG.error("Unable to start {} due to {}", new Object[]{connectable, t});
}
} catch (final Throwable t) {
LOG.error("Unable to start {} due to {}", new Object[]{connectable, t});
}
}
startConnectablesAfterInitialization.clear();
int startedTransmitting = 0;
for (final RemoteGroupPort remoteGroupPort : startRemoteGroupPortsAfterInitialization) {
try {
remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
startedTransmitting++;
} catch (final Throwable t) {
LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t});
startConnectablesAfterInitialization.clear();
int startedTransmitting = 0;
for (final RemoteGroupPort remoteGroupPort : startRemoteGroupPortsAfterInitialization) {
try {
remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
startedTransmitting++;
} catch (final Throwable t) {
LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t});
}
}
LOG.info("Started {} Remote Group Ports transmitting", startedTransmitting);
startRemoteGroupPortsAfterInitialization.clear();
} else {
// We don't want to start all of the delayed components. However, funnels need to be started anyway
// because we don't provide users the ability to start or stop them - they are just notional.
for (final Connectable connectable : startConnectablesAfterInitialization) {
try {
if (connectable instanceof Funnel) {
startConnectable(connectable);
}
} catch (final Throwable t) {
LOG.error("Unable to start {} due to {}", new Object[]{connectable, t});
}
}
startConnectablesAfterInitialization.clear();
startRemoteGroupPortsAfterInitialization.clear();
}
LOG.info("Started {} Remote Group Ports transmitting", startedTransmitting);
startRemoteGroupPortsAfterInitialization.clear();
} finally {
writeLock.unlock();
}

View File

@ -423,18 +423,15 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
*/
controller.startHeartbeating();
// if configured, start all components
if (autoResumeState) {
try {
controller.startDelayed();
} catch (final Exception ex) {
logger.warn("Unable to start all processors due to invalid flow configuration.");
if (logger.isDebugEnabled()) {
logger.warn(StringUtils.EMPTY, ex);
}
// notify controller that flow is initialized
try {
controller.onFlowInitialized(autoResumeState);
} catch (final Exception ex) {
logger.warn("Unable to start all processors due to invalid flow configuration.");
if (logger.isDebugEnabled()) {
logger.warn(StringUtils.EMPTY, ex);
}
}
} else {
try {
loadFromConnectionResponse(response);
@ -732,9 +729,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
controller.setPrimary(response.isPrimary());
// start the processors as indicated by the dataflow
if (dataFlow.isAutoStartProcessors()) {
controller.startDelayed();
}
controller.onFlowInitialized(dataFlow.isAutoStartProcessors());
loadTemplates(dataFlow.getTemplates());
loadSnippets(dataFlow.getSnippets());

View File

@ -692,7 +692,11 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final FunnelDTO funnelDTO = FlowFromDOMFactory.getFunnel(funnelElement);
final Funnel funnel = controller.createFunnel(funnelDTO.getId());
funnel.setPosition(toPosition(funnelDTO.getPosition()));
processGroup.addFunnel(funnel);
// Since this is called during startup, we want to add the funnel without enabling it
// and then tell the controller to enable it. This way, if the controller is not fully
// initialized, the starting of the funnel is delayed until the controller is ready.
processGroup.addFunnel(funnel, false);
controller.startConnectable(funnel);
}

View File

@ -1036,9 +1036,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
final ScheduledState state = funnel.getScheduledState();
if (state == ScheduledState.DISABLED) {
throw new IllegalStateException("Funnel is disabled");
} else if (state == ScheduledState.RUNNING) {
if (state == ScheduledState.RUNNING) {
return;
}
scheduler.startFunnel(funnel);
@ -1131,27 +1129,6 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
@Override
public void enableFunnel(final Funnel funnel) {
readLock.lock();
try {
if (!funnels.containsKey(funnel.getIdentifier())) {
throw new IllegalStateException("No Funnel with ID " + funnel.getIdentifier() + " belongs to this Process Group");
}
final ScheduledState state = funnel.getScheduledState();
if (state == ScheduledState.STOPPED) {
return;
} else if (state == ScheduledState.RUNNING) {
throw new IllegalStateException("Funnel is currently running");
}
scheduler.enableFunnel(funnel);
} finally {
readLock.unlock();
}
}
@Override
public void enableInputPort(final Port port) {
readLock.lock();
@ -1215,26 +1192,6 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
@Override
public void disableFunnel(final Funnel funnel) {
readLock.lock();
try {
if (!funnels.containsKey(funnel.getIdentifier())) {
throw new IllegalStateException("No Funnel with ID " + funnel.getIdentifier() + " belongs to this Process Group");
}
final ScheduledState state = funnel.getScheduledState();
if (state == ScheduledState.DISABLED) {
return;
} else if (state == ScheduledState.RUNNING) {
throw new IllegalStateException("Funnel is currently running");
}
scheduler.disableFunnel(funnel);
} finally {
readLock.unlock();
}
}
@Override
public void disableInputPort(final Port port) {
@ -1546,8 +1503,14 @@ public final class StandardProcessGroup implements ProcessGroup {
return null;
}
@Override
public void addFunnel(final Funnel funnel) {
addFunnel(funnel, true);
}
@Override
public void addFunnel(final Funnel funnel, final boolean autoStart) {
writeLock.lock();
try {
final Funnel existing = funnels.get(requireNonNull(funnel).getIdentifier());
@ -1557,6 +1520,10 @@ public final class StandardProcessGroup implements ProcessGroup {
funnel.setProcessGroup(this);
funnels.put(funnel.getIdentifier(), funnel);
if ( autoStart ) {
startFunnel(funnel);
}
} finally {
writeLock.unlock();
}

View File

@ -91,10 +91,8 @@ public class ApplicationStartupContextListener implements ServletContextListener
* reloading actions, the node will start the necessary
* processors.
*/
if (properties.getAutoResumeState()) {
final FlowController flowController = flowService.getController();
flowController.startDelayed();
}
final FlowController flowController = flowService.getController();
flowController.onFlowInitialized(properties.getAutoResumeState());
logger.info("Flow Controller started successfully.");
}