diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index 7008b2b2f1..4d23cc6fa1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -1131,7 +1131,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen final Set servicesToRestart = new HashSet<>(); try { - stopControllerService(controllerService, proposed, timeout, synchronizationOptions.getComponentStopTimeoutAction(), referencesToRestart, servicesToRestart); + stopControllerService(controllerService, proposed, timeout, synchronizationOptions.getComponentStopTimeoutAction(), + referencesToRestart, servicesToRestart, synchronizationOptions); verifyCanSynchronize(controllerService, proposed); try { @@ -1161,10 +1162,12 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } finally { // Re-enable the controller service if necessary serviceProvider.enableControllerServicesAsync(servicesToRestart); + notifyScheduledStateChange(servicesToRestart, synchronizationOptions); // Restart any components that need to be restarted. if (controllerService != null) { serviceProvider.scheduleReferencingComponents(controllerService, referencesToRestart, context.getComponentScheduler()); + referencesToRestart.forEach(componentNode -> notifyScheduledStateChange(componentNode, synchronizationOptions)); } } } finally { @@ -1438,7 +1441,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen final Set referencingServices = referenceManager.getControllerServicesReferencing(parameterContext, paramName); for (final ControllerServiceNode referencingService : referencingServices) { - stopControllerService(referencingService, null, timeout, synchronizationOptions.getComponentStopTimeoutAction(), componentsToRestart, servicesToRestart); + stopControllerService(referencingService, null, timeout, synchronizationOptions.getComponentStopTimeoutAction(), componentsToRestart, servicesToRestart, + synchronizationOptions); servicesToRestart.add(referencingService); } } @@ -1492,6 +1496,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen for (final ComponentNode stoppedComponent : componentsToRestart) { if (stoppedComponent instanceof Connectable) { context.getComponentScheduler().startComponent((Connectable) stoppedComponent); + notifyScheduledStateChange(stoppedComponent, synchronizationOptions); } } } @@ -1545,7 +1550,9 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen waitFor(timeout, () -> isDoneProcessing(processGroup)); // Disable all Controller Services - final Future disableServicesFuture = context.getControllerServiceProvider().disableControllerServicesAsync(processGroup.findAllControllerServices()); + final Collection controllerServices = processGroup.findAllControllerServices(); + final Future disableServicesFuture = context.getControllerServiceProvider().disableControllerServicesAsync(controllerServices); + notifyScheduledStateChange(controllerServices, synchronizationOptions); try { disableServicesFuture.get(timeout, TimeUnit.MILLISECONDS); } catch (final ExecutionException ee) { @@ -1648,6 +1655,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen // Stop all necessary enabled/active Controller Services final Future serviceDisableFuture = context.getControllerServiceProvider().disableControllerServicesAsync(controllerServicesToStop); + notifyScheduledStateChange(controllerServicesToStop, synchronizationOptions); try { serviceDisableFuture.get(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } catch (ExecutionException e) { @@ -1675,9 +1683,11 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } finally { // Re-enable all Controller Services that we disabled and restart all processors context.getControllerServiceProvider().enableControllerServicesAsync(controllerServicesToStop); + notifyScheduledStateChange(controllerServicesToStop, synchronizationOptions); for (final ProcessorNode processor : processorsToStop) { processor.getProcessGroup().startProcessor(processor, false); + notifyScheduledStateChange((ComponentNode) processor,synchronizationOptions); } } } finally { @@ -2109,9 +2119,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } } finally { // Restart any components that need to be restarted. - for (final Connectable stoppedComponent : toRestart) { - context.getComponentScheduler().startComponent(stoppedComponent); - } + startComponents(toRestart, synchronizationOptions); } } @@ -2236,15 +2244,20 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } } finally { // Restart any components that need to be restarted. - for (final Connectable stoppedComponent : toRestart) { - context.getComponentScheduler().startComponent(stoppedComponent); - } + startComponents(toRestart, synchronizationOptions); } } finally { synchronizationOptions.getComponentScheduler().resume(); } } + private void startComponents(final Collection stoppedComponents, final FlowSynchronizationOptions synchronizationOptions) { + for (final Connectable stoppedComponent : stoppedComponents) { + context.getComponentScheduler().startComponent(stoppedComponent); + notifyScheduledStateChange(stoppedComponent, synchronizationOptions); + } + } + private void updatePort(final Port port, final VersionedPort proposed, final String temporaryName) { final String name = temporaryName != null ? temporaryName : proposed.getName(); port.setComments(proposed.getComments()); @@ -2422,9 +2435,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } } finally { // Restart any components that need to be restarted. - for (final Connectable stoppedComponent : toRestart) { - context.getComponentScheduler().startComponent(stoppedComponent); - } + startComponents(toRestart, synchronizationOptions); } } finally { synchronizationOptions.getComponentScheduler().resume(); @@ -2476,6 +2487,50 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen return stoppedComponents; } + private void notifyScheduledStateChange(final Connectable component, final FlowSynchronizationOptions synchronizationOptions) { + try { + if (component instanceof ProcessorNode) { + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode) component); + } else if (component instanceof Port) { + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port) component); + } + } catch (final Exception e) { + LOG.debug("Failed to notify listeners of ScheduledState changes", e); + } + } + + private void notifyScheduledStateChange(final ComponentNode component, final FlowSynchronizationOptions synchronizationOptions) { + try { + if (component instanceof ProcessorNode) { + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode) component); + } else if (component instanceof Port) { + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port) component); + } else if (component instanceof ControllerServiceNode) { + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ControllerServiceNode) component); + } else if (component instanceof ReportingTaskNode) { + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ReportingTaskNode) component); + } + } catch (final Exception e) { + LOG.debug("Failed to notify listeners of ScheduledState changes", e); + } + } + + private void notifyScheduledStateChange(final Collection servicesToRestart, final FlowSynchronizationOptions synchronizationOptions) { + try { + servicesToRestart.forEach(synchronizationOptions.getScheduledStateChangeListener()::onScheduledStateChange); + } catch (final Exception e) { + LOG.debug("Failed to notify listeners of ScheduledState changes", e); + } + } + + private void notifyScheduledStateChange(final Port inputPort, final FlowSynchronizationOptions synchronizationOptions) { + try { + synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(inputPort); + } catch (final Exception e) { + LOG.debug("Failed to notify listeners of ScheduledState changes", e); + } + } + private boolean stopOrTerminate(final Connectable component, final long timeout, final FlowSynchronizationOptions synchronizationOptions) throws TimeoutException, FlowSynchronizationException { if (!component.isRunning()) { return false; @@ -2484,13 +2539,18 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen final ConnectableType connectableType = component.getConnectableType(); switch (connectableType) { case INPUT_PORT: - component.getProcessGroup().stopInputPort((Port) component); + final Port inputPort = (Port) component; + component.getProcessGroup().stopInputPort(inputPort); + notifyScheduledStateChange(inputPort, synchronizationOptions); return true; case OUTPUT_PORT: - component.getProcessGroup().stopOutputPort((Port) component); + final Port outputPort = (Port) component; + component.getProcessGroup().stopOutputPort(outputPort); + notifyScheduledStateChange(outputPort, synchronizationOptions); return true; case PROCESSOR: - return stopOrTerminate((ProcessorNode) component, timeout, synchronizationOptions); + final ProcessorNode processorNode = (ProcessorNode) component; + return stopOrTerminate(processorNode, timeout, synchronizationOptions); default: return false; } @@ -2499,6 +2559,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen private boolean stopOrTerminate(final ProcessorNode processor, final long timeout, final FlowSynchronizationOptions synchronizationOptions) throws TimeoutException, FlowSynchronizationException { try { LOG.debug("Stopping {} in order to synchronize it with proposed version", processor); + return stopProcessor(processor, timeout); } catch (final TimeoutException te) { switch (synchronizationOptions.getComponentStopTimeoutAction()) { @@ -2509,6 +2570,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen processor.terminate(); return true; } + } finally { + notifyScheduledStateChange((ComponentNode) processor, synchronizationOptions); } } @@ -2531,7 +2594,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen private void stopControllerService(final ControllerServiceNode controllerService, final VersionedControllerService proposed, final long timeout, final FlowSynchronizationOptions.ComponentStopTimeoutAction timeoutAction, final Set referencesStopped, - final Set servicesDisabled) throws FlowSynchronizationException, + final Set servicesDisabled, final FlowSynchronizationOptions synchronizationOptions) throws FlowSynchronizationException, TimeoutException, InterruptedException { final ControllerServiceProvider serviceProvider = context.getControllerServiceProvider(); if (controllerService == null) { @@ -2546,6 +2609,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen final Future future = entry.getValue(); waitForStopCompletion(future, component, timeout, timeoutAction); + notifyScheduledStateChange(component, synchronizationOptions); } if (controllerService.isActive()) { @@ -2569,6 +2633,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen // Disable the service and wait for completion, up to the timeout allowed final Future future = serviceProvider.disableControllerServicesAsync(servicesToStop); waitForStopCompletion(future, controllerService, timeout, timeoutAction); + notifyScheduledStateChange(servicesToStop, synchronizationOptions); } } @@ -2663,13 +2728,18 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen synchronizationOptions.getComponentScheduler().pause(); try { - // Stop the processor, if necessary, in order to update it. + // Stop the rpg, if necessary, in order to update it. final Set toRestart = new HashSet<>(); if (rpg != null) { if (rpg.isTransmitting()) { final Set transmitting = getTransmittingPorts(rpg); final Future future = rpg.stopTransmitting(); + try { + transmitting.forEach(synchronizationOptions.getScheduledStateChangeListener()::onScheduledStateChange); + } catch (final Exception e) { + LOG.debug("Failed to notify listeners of ScheduledState changes", e); + } waitForStopCompletion(future, rpg, timeout, synchronizationOptions.getComponentStopTimeoutAction()); final boolean proposedTransmitting = isTransmitting(proposed); @@ -2708,9 +2778,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen throw new FlowSynchronizationException("Failed to synchronize " + rpg + " with proposed version", e); } finally { // Restart any components that need to be restarted. - for (final Connectable stoppedComponent : toRestart) { - context.getComponentScheduler().startComponent(stoppedComponent); - } + startComponents(toRestart, synchronizationOptions); } } finally { synchronizationOptions.getComponentScheduler().resume(); @@ -2966,7 +3034,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } LOG.info("Components upstream of {} did not stop in time. Will terminate {}", connection, upstream); - terminateComponents(upstream); + terminateComponents(upstream, synchronizationOptions); stoppedComponents = upstream; } @@ -3006,9 +3074,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } finally { // If not removing the connection, restart any component that we stopped. if (proposedConnection != null) { - for (final Connectable component : stoppedComponents) { - context.getComponentScheduler().startComponent(component); - } + startComponents(stoppedComponents, synchronizationOptions); } } } @@ -3030,7 +3096,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen } } - private void terminateComponents(final Set components) { + private void terminateComponents(final Set components, final FlowSynchronizationOptions synchronizationOptions) { for (final Connectable component : components) { if (!(component instanceof ProcessorNode)) { continue; @@ -3043,6 +3109,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen processor.getProcessGroup().stopProcessor(processor); processor.terminate(); + notifyScheduledStateChange((ComponentNode) processor, synchronizationOptions); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java index 0dab82bc40..98a6d8a8e8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java @@ -26,6 +26,7 @@ import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReloadComponent; +import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.LoadBalanceStrategy; @@ -48,6 +49,7 @@ import org.apache.nifi.flow.VersionedPort; import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.groups.ComponentIdGenerator; import org.apache.nifi.groups.ComponentScheduler; +import org.apache.nifi.groups.ScheduledStateChangeListener; import org.apache.nifi.groups.FlowSynchronizationOptions; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.LogLevel; @@ -77,6 +79,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -126,6 +129,7 @@ public class StandardVersionedComponentSynchronizerTest { private ControllerServiceProvider controllerServiceProvider; private ParameterContextManager parameterContextManager; private ParameterReferenceManager parameterReferenceManager; + private CapturingScheduledStateChangeListener scheduledStateChangeListener; private final Set queuesWithData = Collections.synchronizedSet(new HashSet<>()); private final Bundle bundle = new Bundle("group", "artifact", "version 1.0"); @@ -197,10 +201,13 @@ public class StandardVersionedComponentSynchronizerTest { when(group.getInputPorts()).thenReturn(Collections.singleton(inputPort)); when(group.getOutputPorts()).thenReturn(Collections.singleton(outputPort)); + scheduledStateChangeListener = new CapturingScheduledStateChangeListener(); + synchronizationOptions = new FlowSynchronizationOptions.Builder() .componentIdGenerator(componentIdGenerator) .componentComparisonIdLookup(VersionedComponent::getIdentifier) .componentScheduler(componentScheduler) + .scheduledStateChangeListener(scheduledStateChangeListener) .build(); synchronizer = new StandardVersionedComponentSynchronizer(context); @@ -213,6 +220,7 @@ public class StandardVersionedComponentSynchronizerTest { .componentIdGenerator(componentIdGenerator) .componentComparisonIdLookup(VersionedComponent::getIdentifier) .componentScheduler(componentScheduler) + .scheduledStateChangeListener(scheduledStateChangeListener) .componentStopTimeout(Duration.ofMillis(10)) .componentStopTimeoutAction(timeoutAction) .build(); @@ -382,6 +390,8 @@ public class StandardVersionedComponentSynchronizerTest { verify(connectionAB, times(1)).setName("Hello"); verify(connectionAB, times(1)).setRelationships(Collections.singleton(new Relationship.Builder().name("success").build())); + + scheduledStateChangeListener.assertNumProcessorUpdates(0); } @Test @@ -399,6 +409,8 @@ public class StandardVersionedComponentSynchronizerTest { // Ensure that the source was stopped and restarted verifyStopped(processorA); verifyRestarted(processorA); + + verifyCallbackIndicatedRestart(processorA); } @Test @@ -420,6 +432,8 @@ public class StandardVersionedComponentSynchronizerTest { // Ensure that the source was stopped and restarted verifyStopped(processorA); verifyRestarted(processorA); + + verifyCallbackIndicatedRestart(processorA); } @Test @@ -444,6 +458,29 @@ public class StandardVersionedComponentSynchronizerTest { // Ensure that the source was stopped and restarted verifyStopped(processorA); verifyNotRestarted(processorA); + verifyCallbackIndicatedStopOnly(processorA); + } + + private void verifyCallbackIndicatedRestart(final ProcessorNode... processors) { + for (final ProcessorNode processor : processors) { + scheduledStateChangeListener.assertProcessorUpdates(new ScheduledStateUpdate<>(processor, org.apache.nifi.controller.ScheduledState.STOPPED), + new ScheduledStateUpdate<>(processor, org.apache.nifi.controller.ScheduledState.RUNNING)); + } + scheduledStateChangeListener.assertNumProcessorUpdates(processors.length * 2); + } + + private void verifyCallbackIndicatedStopOnly(final ProcessorNode... processors) { + for (final ProcessorNode processor : processors) { + scheduledStateChangeListener.assertProcessorUpdates(new ScheduledStateUpdate<>(processor, org.apache.nifi.controller.ScheduledState.STOPPED)); + } + scheduledStateChangeListener.assertNumProcessorUpdates(processors.length); + } + + private void verifyCallbackIndicatedStartOnly(final ProcessorNode... processors) { + for (final ProcessorNode processor : processors) { + scheduledStateChangeListener.assertProcessorUpdates(new ScheduledStateUpdate<>(processor, org.apache.nifi.controller.ScheduledState.RUNNING)); + } + scheduledStateChangeListener.assertNumProcessorUpdates(processors.length); } @Test @@ -456,6 +493,7 @@ public class StandardVersionedComponentSynchronizerTest { verifyStopped(processorA); verifyNotRestarted(processorA); verify(group).removeConnection(connectionAB); + verifyCallbackIndicatedStopOnly(processorA); } @Test @@ -477,6 +515,7 @@ public class StandardVersionedComponentSynchronizerTest { // can be removed. verifyStopped(processorA); verifyNotRestarted(processorA); + verifyCallbackIndicatedStopOnly(processorA); } @Test @@ -513,6 +552,7 @@ public class StandardVersionedComponentSynchronizerTest { // Ensure that the source was stopped, destination was stopped, and the connection was removed. verifyStopped(processorA); verifyNotRestarted(processorA); + verifyCallbackIndicatedStopOnly(processorB, processorA); verifyStopped(processorB); verifyNotRestarted(processorB); verify(group, times(1)).removeConnection(connectionAB); @@ -1032,4 +1072,73 @@ public class StandardVersionedComponentSynchronizerTest { return versionedPort; } + + private class ScheduledStateUpdate { + private T component; + private org.apache.nifi.controller.ScheduledState state; + + public ScheduledStateUpdate(T component, org.apache.nifi.controller.ScheduledState state) { + this.component = component; + this.state = state; + } + } + + private class ControllerServiceStateUpdate { + private ControllerServiceNode controllerService; + private ControllerServiceState state; + + public ControllerServiceStateUpdate(ControllerServiceNode controllerService, ControllerServiceState state) { + this.controllerService = controllerService; + this.state = state; + } + } + + private class CapturingScheduledStateChangeListener implements ScheduledStateChangeListener { + + private List> processorUpdates = new ArrayList<>(); + private List> portUpdates = new ArrayList<>(); + private List serviceUpdates = new ArrayList<>(); + private List> reportingTaskUpdates = new ArrayList<>(); + + @Override + public void onScheduledStateChange(final ProcessorNode processor) { + processorUpdates.add(new ScheduledStateUpdate<>(processor, processor.getScheduledState())); + } + + @Override + public void onScheduledStateChange(ControllerServiceNode controllerService) { + serviceUpdates.add(new ControllerServiceStateUpdate(controllerService, controllerService.getState())); + } + + @Override + public void onScheduledStateChange(ReportingTaskNode reportingTask) { + reportingTaskUpdates.add(new ScheduledStateUpdate<>(reportingTask, reportingTask.getScheduledState())); + } + + @Override + public void onScheduledStateChange(final Port port) { + portUpdates.add(new ScheduledStateUpdate<>(port, port.getScheduledState())); + } + + void assertNumProcessorUpdates(int expectedNum) { + assertEquals("Expected " + expectedNum + " processor state changes", expectedNum, processorUpdates.size()); + } + + void assertProcessorUpdates(final ScheduledStateUpdate... updates) { + final Iterator> it = processorUpdates.iterator(); + for (final ScheduledStateUpdate expectedUpdate : updates) { + final ScheduledStateUpdate capturedUpdate = it.next(); + assertEquals(expectedUpdate.component.getName(), capturedUpdate.component.getName()); + if (expectedUpdate.state == org.apache.nifi.controller.ScheduledState.RUNNING) { + verifyRestarted(capturedUpdate.component); + } else if (expectedUpdate.state == org.apache.nifi.controller.ScheduledState.STOPPED) { + verifyStopped(capturedUpdate.component); + } + } + } + + void assertNumPortUpdates(int expectedNum) { + assertEquals("Expected " + expectedNum + " port state changes", expectedNum, portUpdates.size()); + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java index b085b10f6b..a1ca499ae4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowSynchronizationOptions.java @@ -35,6 +35,7 @@ public class FlowSynchronizationOptions { private final boolean updateRpgUrls; private final Duration componentStopTimeout; private final ComponentStopTimeoutAction timeoutAction; + private final ScheduledStateChangeListener scheduledStateChangeListener; private FlowSynchronizationOptions(final Builder builder) { this.componentIdGenerator = builder.componentIdGenerator; @@ -49,6 +50,7 @@ public class FlowSynchronizationOptions { this.updateRpgUrls = builder.updateRpgUrls; this.componentStopTimeout = builder.componentStopTimeout; this.timeoutAction = builder.timeoutAction; + this.scheduledStateChangeListener = builder.scheduledStateChangeListener; } public ComponentIdGenerator getComponentIdGenerator() { @@ -99,6 +101,10 @@ public class FlowSynchronizationOptions { return timeoutAction; } + public ScheduledStateChangeListener getScheduledStateChangeListener() { + return scheduledStateChangeListener; + } + public static class Builder { private ComponentIdGenerator componentIdGenerator; private Function componentComparisonIdLookup; @@ -109,6 +115,7 @@ public class FlowSynchronizationOptions { private boolean updateGroupVersionControlSnapshot = true; private boolean updateExistingVariables = false; private boolean updateRpgUrls = false; + private ScheduledStateChangeListener scheduledStateChangeListener; private PropertyDecryptor propertyDecryptor = value -> value; private Duration componentStopTimeout = Duration.ofSeconds(30); private ComponentStopTimeoutAction timeoutAction = ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION; @@ -247,6 +254,15 @@ public class FlowSynchronizationOptions { return this; } + /** + * Specifies a callback whose methods will be called when component scheduled states are updated by the synchronizer + * @param listener the ScheduledStateChangeListener to use + * @return the builder + */ + public Builder scheduledStateChangeListener(final ScheduledStateChangeListener listener) { + this.scheduledStateChangeListener = listener; + return this; + } public FlowSynchronizationOptions build() { if (componentIdGenerator == null) { @@ -258,6 +274,9 @@ public class FlowSynchronizationOptions { if (componentScheduler == null) { throw new IllegalStateException("Must set Component Scheduler"); } + if (scheduledStateChangeListener == null) { + scheduledStateChangeListener = ScheduledStateChangeListener.EMPTY; + } return new FlowSynchronizationOptions(this); } @@ -276,6 +295,7 @@ public class FlowSynchronizationOptions { builder.propertyDecryptor = options.getPropertyDecryptor(); builder.componentStopTimeout = options.getComponentStopTimeout(); builder.timeoutAction = options.getComponentStopTimeoutAction(); + builder.scheduledStateChangeListener = options.getScheduledStateChangeListener(); return builder; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ScheduledStateChangeListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ScheduledStateChangeListener.java new file mode 100644 index 0000000000..d2c45e3165 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ScheduledStateChangeListener.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +import org.apache.nifi.connectable.Port; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.service.ControllerServiceNode; + +public interface ScheduledStateChangeListener { + void onScheduledStateChange(ProcessorNode processor); + + void onScheduledStateChange(Port port); + + void onScheduledStateChange(ControllerServiceNode controllerService); + + void onScheduledStateChange(ReportingTaskNode reportingTask); + + ScheduledStateChangeListener EMPTY = new ScheduledStateChangeListener() { + @Override + public void onScheduledStateChange(ProcessorNode processor) { + + } + + @Override + public void onScheduledStateChange(Port port) { + + } + + @Override + public void onScheduledStateChange(ControllerServiceNode controllerService) { + + } + + @Override + public void onScheduledStateChange(ReportingTaskNode reportingTask) { + + } + }; +}