From 43931b15043e4cb712c6e8dca19ebaa2a55796e8 Mon Sep 17 00:00:00 2001 From: Joe Gresock Date: Mon, 19 Sep 2022 10:38:00 -0400 Subject: [PATCH] NIFI-10518: Adding intended state to ScheduledStateChangeListener (#6428) - NIFI-10518: Adding intended state to ScheduledStateChangeListener - Notifying of scheduled state change when transitionComponentState is called - Notifying scheduled state change when reporting task state is changed - Notifying scheduledState changes for remote group port start/stop components calls --- ...tandardVersionedComponentSynchronizer.java | 78 ++++++++++++------- ...ardVersionedComponentSynchronizerTest.java | 8 +- .../groups/ScheduledStateChangeListener.java | 17 ++-- 3 files changed, 64 insertions(+), 39 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index 47d4077bcb..09abfafd36 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -35,6 +35,7 @@ import org.apache.nifi.controller.PropertyConfiguration; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.Template; +import org.apache.nifi.controller.Triggerable; import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.queue.FlowFileQueue; @@ -1164,12 +1165,12 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen if (proposed != null && proposed.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) { // Re-enable the controller service if necessary serviceProvider.enableControllerServicesAsync(servicesToRestart); - notifyScheduledStateChange(servicesToRestart, synchronizationOptions); + notifyScheduledStateChange(servicesToRestart, synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED); // Restart any components that need to be restarted. if (controllerService != null) { serviceProvider.scheduleReferencingComponents(controllerService, referencesToRestart, context.getComponentScheduler()); - referencesToRestart.forEach(componentNode -> notifyScheduledStateChange(componentNode, synchronizationOptions)); + referencesToRestart.forEach(componentNode -> notifyScheduledStateChange(componentNode, synchronizationOptions, org.apache.nifi.flow.ScheduledState.RUNNING)); } } } @@ -1499,7 +1500,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen for (final ComponentNode stoppedComponent : componentsToRestart) { if (stoppedComponent instanceof Connectable) { context.getComponentScheduler().startComponent((Connectable) stoppedComponent); - notifyScheduledStateChange(stoppedComponent, synchronizationOptions); + notifyScheduledStateChange(stoppedComponent, synchronizationOptions, org.apache.nifi.flow.ScheduledState.RUNNING); } } } @@ -1555,7 +1556,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen // Disable all Controller Services final Collection controllerServices = processGroup.findAllControllerServices(); final Future disableServicesFuture = context.getControllerServiceProvider().disableControllerServicesAsync(controllerServices); - notifyScheduledStateChange(controllerServices, synchronizationOptions); + notifyScheduledStateChange(controllerServices, synchronizationOptions, org.apache.nifi.flow.ScheduledState.DISABLED); try { disableServicesFuture.get(timeout, TimeUnit.MILLISECONDS); } catch (final ExecutionException ee) { @@ -1658,7 +1659,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen // Stop all necessary enabled/active Controller Services final Future serviceDisableFuture = context.getControllerServiceProvider().disableControllerServicesAsync(controllerServicesToStop); - notifyScheduledStateChange(controllerServicesToStop, synchronizationOptions); + notifyScheduledStateChange(controllerServicesToStop, synchronizationOptions, org.apache.nifi.flow.ScheduledState.DISABLED); try { serviceDisableFuture.get(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } catch (ExecutionException e) { @@ -1686,11 +1687,11 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } finally { // Re-enable all Controller Services that we disabled and restart all processors context.getControllerServiceProvider().enableControllerServicesAsync(controllerServicesToStop); - notifyScheduledStateChange(controllerServicesToStop, synchronizationOptions); + notifyScheduledStateChange(controllerServicesToStop, synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED); for (final ProcessorNode processor : processorsToStop) { processor.getProcessGroup().startProcessor(processor, false); - notifyScheduledStateChange((ComponentNode) processor,synchronizationOptions); + notifyScheduledStateChange((ComponentNode) processor,synchronizationOptions, org.apache.nifi.flow.ScheduledState.RUNNING); } } } finally { @@ -2257,7 +2258,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen private void startComponents(final Collection stoppedComponents, final FlowSynchronizationOptions synchronizationOptions) { for (final Connectable stoppedComponent : stoppedComponents) { context.getComponentScheduler().startComponent(stoppedComponent); - notifyScheduledStateChange(stoppedComponent, synchronizationOptions); + notifyScheduledStateChange(stoppedComponent, synchronizationOptions, org.apache.nifi.flow.ScheduledState.RUNNING); } } @@ -2269,6 +2270,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen port.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount()); context.getComponentScheduler().transitionComponentState(port, proposed.getScheduledState()); + notifyScheduledStateChange(port, syncOptions, proposed.getScheduledState()); } private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final ComponentIdGenerator componentIdGenerator, final String temporaryName) { @@ -2490,45 +2492,62 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen return stoppedComponents; } - private void notifyScheduledStateChange(final Connectable component, final FlowSynchronizationOptions synchronizationOptions) { + private void notifyScheduledStateChange(final Connectable component, final FlowSynchronizationOptions synchronizationOptions, final org.apache.nifi.flow.ScheduledState intendedState) { try { if (component instanceof ProcessorNode) { - synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode) component); + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode) component, intendedState); } else if (component instanceof Port) { - synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port) component); + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port) component, intendedState); } } catch (final Exception e) { LOG.debug("Failed to notify listeners of ScheduledState changes", e); } } - private void notifyScheduledStateChange(final ComponentNode component, final FlowSynchronizationOptions synchronizationOptions) { + private void notifyScheduledStateChange(final ComponentNode component, final FlowSynchronizationOptions synchronizationOptions, final org.apache.nifi.flow.ScheduledState intendedState) { + if (component instanceof Triggerable && intendedState == org.apache.nifi.flow.ScheduledState.RUNNING && ((Triggerable) component).getScheduledState() == ScheduledState.DISABLED) { + return; + } try { if (component instanceof ProcessorNode) { - synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode) component); + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode) component, intendedState); } else if (component instanceof Port) { - synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port) component); + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port) component, intendedState); } else if (component instanceof ControllerServiceNode) { - synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ControllerServiceNode) component); + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ControllerServiceNode) component, intendedState); } else if (component instanceof ReportingTaskNode) { - synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ReportingTaskNode) component); + final ReportingTaskNode reportingTaskNode = (ReportingTaskNode) component; + if (intendedState == org.apache.nifi.flow.ScheduledState.RUNNING && reportingTaskNode.getScheduledState() == ScheduledState.DISABLED) { + return; + } + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(reportingTaskNode, intendedState); } } catch (final Exception e) { LOG.debug("Failed to notify listeners of ScheduledState changes", e); } } - private void notifyScheduledStateChange(final Collection servicesToRestart, final FlowSynchronizationOptions synchronizationOptions) { + private void notifyScheduledStateChange(final Collection servicesToRestart, final FlowSynchronizationOptions synchronizationOptions, + final org.apache.nifi.flow.ScheduledState intendedState) { try { - servicesToRestart.forEach(synchronizationOptions.getScheduledStateChangeListener()::onScheduledStateChange); + servicesToRestart.forEach(service -> { + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(service, intendedState); + if (intendedState == org.apache.nifi.flow.ScheduledState.DISABLED) { + service.getReferences().findRecursiveReferences(ControllerServiceNode.class) + .forEach(reference -> synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(reference, org.apache.nifi.flow.ScheduledState.DISABLED)); + } else if (intendedState == org.apache.nifi.flow.ScheduledState.ENABLED) { + service.getRequiredControllerServices().forEach(requiredService -> synchronizationOptions.getScheduledStateChangeListener() + .onScheduledStateChange(requiredService, org.apache.nifi.flow.ScheduledState.ENABLED)); + } + }); } catch (final Exception e) { LOG.debug("Failed to notify listeners of ScheduledState changes", e); } } - private void notifyScheduledStateChange(final Port inputPort, final FlowSynchronizationOptions synchronizationOptions) { + private void notifyScheduledStateChange(final Port inputPort, final FlowSynchronizationOptions synchronizationOptions, final org.apache.nifi.flow.ScheduledState intendedState) { try { - synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(inputPort); + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(inputPort, intendedState); } catch (final Exception e) { LOG.debug("Failed to notify listeners of ScheduledState changes", e); } @@ -2544,12 +2563,12 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen case INPUT_PORT: final Port inputPort = (Port) component; component.getProcessGroup().stopInputPort(inputPort); - notifyScheduledStateChange(inputPort, synchronizationOptions); + notifyScheduledStateChange(inputPort, synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED); return true; case OUTPUT_PORT: final Port outputPort = (Port) component; component.getProcessGroup().stopOutputPort(outputPort); - notifyScheduledStateChange(outputPort, synchronizationOptions); + notifyScheduledStateChange(outputPort, synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED); return true; case PROCESSOR: final ProcessorNode processorNode = (ProcessorNode) component; @@ -2574,7 +2593,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen return true; } } finally { - notifyScheduledStateChange((ComponentNode) processor, synchronizationOptions); + notifyScheduledStateChange((ComponentNode) processor, synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED); } } @@ -2612,7 +2631,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen final Future future = entry.getValue(); waitForStopCompletion(future, component, timeout, timeoutAction); - notifyScheduledStateChange(component, synchronizationOptions); + notifyScheduledStateChange(component, synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED); } if (controllerService.isActive()) { @@ -2636,7 +2655,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen // Disable the service and wait for completion, up to the timeout allowed final Future future = serviceProvider.disableControllerServicesAsync(servicesToStop); waitForStopCompletion(future, controllerService, timeout, timeoutAction); - notifyScheduledStateChange(servicesToStop, synchronizationOptions); + notifyScheduledStateChange(servicesToStop, synchronizationOptions, org.apache.nifi.flow.ScheduledState.DISABLED); } } @@ -2689,6 +2708,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen // Transition state to disabled/enabled/running context.getComponentScheduler().transitionComponentState(processor, proposed.getScheduledState()); + notifyScheduledStateChange((ComponentNode) processor, syncOptions, proposed.getScheduledState()); if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) { final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle()); @@ -2739,7 +2759,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen final Future future = rpg.stopTransmitting(); try { - transmitting.forEach(synchronizationOptions.getScheduledStateChangeListener()::onScheduledStateChange); + transmitting.forEach(remoteGroupPort -> synchronizationOptions.getScheduledStateChangeListener() + .onScheduledStateChange(remoteGroupPort, org.apache.nifi.flow.ScheduledState.ENABLED)); } catch (final Exception e) { LOG.debug("Failed to notify listeners of ScheduledState changes", e); } @@ -2947,10 +2968,12 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen if (versionedPort.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) { if (portState != ScheduledState.RUNNING) { context.getComponentScheduler().startComponent(remoteGroupPort); + notifyScheduledStateChange(remoteGroupPort, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING); } } else { if (portState == ScheduledState.RUNNING) { context.getComponentScheduler().stopComponent(remoteGroupPort); + notifyScheduledStateChange(remoteGroupPort, syncOptions, org.apache.nifi.flow.ScheduledState.ENABLED); } } } @@ -3112,7 +3135,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen processor.getProcessGroup().stopProcessor(processor); processor.terminate(); - notifyScheduledStateChange((ComponentNode) processor, synchronizationOptions); + notifyScheduledStateChange((ComponentNode) processor, synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED); } } @@ -3419,6 +3442,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } break; } + notifyScheduledStateChange(reportingTask, syncOptions, proposed.getScheduledState()); } finally { reportingTask.resumeValidationTrigger(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java index 3ed688c6ab..8360d4ff74 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java @@ -1137,22 +1137,22 @@ public class StandardVersionedComponentSynchronizerTest { private List> reportingTaskUpdates = new ArrayList<>(); @Override - public void onScheduledStateChange(final ProcessorNode processor) { + public void onScheduledStateChange(final ProcessorNode processor, final ScheduledState intendedState) { processorUpdates.add(new ScheduledStateUpdate<>(processor, processor.getScheduledState())); } @Override - public void onScheduledStateChange(ControllerServiceNode controllerService) { + public void onScheduledStateChange(ControllerServiceNode controllerService, final ScheduledState intendedState) { serviceUpdates.add(new ControllerServiceStateUpdate(controllerService, controllerService.getState())); } @Override - public void onScheduledStateChange(ReportingTaskNode reportingTask) { + public void onScheduledStateChange(ReportingTaskNode reportingTask, final ScheduledState intendedState) { reportingTaskUpdates.add(new ScheduledStateUpdate<>(reportingTask, reportingTask.getScheduledState())); } @Override - public void onScheduledStateChange(final Port port) { + public void onScheduledStateChange(final Port port, final ScheduledState intendedState) { portUpdates.add(new ScheduledStateUpdate<>(port, port.getScheduledState())); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ScheduledStateChangeListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ScheduledStateChangeListener.java index d2c45e3165..64b17e4213 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ScheduledStateChangeListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ScheduledStateChangeListener.java @@ -21,34 +21,35 @@ import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.flow.ScheduledState; public interface ScheduledStateChangeListener { - void onScheduledStateChange(ProcessorNode processor); + void onScheduledStateChange(ProcessorNode processor, ScheduledState intendedState); - void onScheduledStateChange(Port port); + void onScheduledStateChange(Port port, ScheduledState intendedState); - void onScheduledStateChange(ControllerServiceNode controllerService); + void onScheduledStateChange(ControllerServiceNode controllerService, ScheduledState intendedState); - void onScheduledStateChange(ReportingTaskNode reportingTask); + void onScheduledStateChange(ReportingTaskNode reportingTask, ScheduledState intendedState); ScheduledStateChangeListener EMPTY = new ScheduledStateChangeListener() { @Override - public void onScheduledStateChange(ProcessorNode processor) { + public void onScheduledStateChange(ProcessorNode processor, ScheduledState intendedState) { } @Override - public void onScheduledStateChange(Port port) { + public void onScheduledStateChange(Port port, ScheduledState intendedState) { } @Override - public void onScheduledStateChange(ControllerServiceNode controllerService) { + public void onScheduledStateChange(ControllerServiceNode controllerService, ScheduledState intendedState) { } @Override - public void onScheduledStateChange(ReportingTaskNode reportingTask) { + public void onScheduledStateChange(ReportingTaskNode reportingTask, ScheduledState intendedState) { } };