NIFI-10401: Adding ScheduledStateChangeListener to synchronizer (#6341)

NIFI-10401: Adding ScheduledStateChangeListener to synchronizer
This commit is contained in:
Joe Gresock 2022-09-15 09:15:44 -04:00 committed by GitHub
parent 3987d39cdc
commit 51d01f874e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 276 additions and 25 deletions

View File

@ -1131,7 +1131,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final Set<ControllerServiceNode> servicesToRestart = new HashSet<>(); final Set<ControllerServiceNode> servicesToRestart = new HashSet<>();
try { try {
stopControllerService(controllerService, proposed, timeout, synchronizationOptions.getComponentStopTimeoutAction(), referencesToRestart, servicesToRestart); stopControllerService(controllerService, proposed, timeout, synchronizationOptions.getComponentStopTimeoutAction(),
referencesToRestart, servicesToRestart, synchronizationOptions);
verifyCanSynchronize(controllerService, proposed); verifyCanSynchronize(controllerService, proposed);
try { try {
@ -1161,10 +1162,12 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
} finally { } finally {
// Re-enable the controller service if necessary // Re-enable the controller service if necessary
serviceProvider.enableControllerServicesAsync(servicesToRestart); serviceProvider.enableControllerServicesAsync(servicesToRestart);
notifyScheduledStateChange(servicesToRestart, synchronizationOptions);
// Restart any components that need to be restarted. // Restart any components that need to be restarted.
if (controllerService != null) { if (controllerService != null) {
serviceProvider.scheduleReferencingComponents(controllerService, referencesToRestart, context.getComponentScheduler()); serviceProvider.scheduleReferencingComponents(controllerService, referencesToRestart, context.getComponentScheduler());
referencesToRestart.forEach(componentNode -> notifyScheduledStateChange(componentNode, synchronizationOptions));
} }
} }
} finally { } finally {
@ -1438,7 +1441,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final Set<ControllerServiceNode> referencingServices = referenceManager.getControllerServicesReferencing(parameterContext, paramName); final Set<ControllerServiceNode> referencingServices = referenceManager.getControllerServicesReferencing(parameterContext, paramName);
for (final ControllerServiceNode referencingService : referencingServices) { for (final ControllerServiceNode referencingService : referencingServices) {
stopControllerService(referencingService, null, timeout, synchronizationOptions.getComponentStopTimeoutAction(), componentsToRestart, servicesToRestart); stopControllerService(referencingService, null, timeout, synchronizationOptions.getComponentStopTimeoutAction(), componentsToRestart, servicesToRestart,
synchronizationOptions);
servicesToRestart.add(referencingService); servicesToRestart.add(referencingService);
} }
} }
@ -1492,6 +1496,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
for (final ComponentNode stoppedComponent : componentsToRestart) { for (final ComponentNode stoppedComponent : componentsToRestart) {
if (stoppedComponent instanceof Connectable) { if (stoppedComponent instanceof Connectable) {
context.getComponentScheduler().startComponent((Connectable) stoppedComponent); context.getComponentScheduler().startComponent((Connectable) stoppedComponent);
notifyScheduledStateChange(stoppedComponent, synchronizationOptions);
} }
} }
} }
@ -1545,7 +1550,9 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
waitFor(timeout, () -> isDoneProcessing(processGroup)); waitFor(timeout, () -> isDoneProcessing(processGroup));
// Disable all Controller Services // Disable all Controller Services
final Future<Void> disableServicesFuture = context.getControllerServiceProvider().disableControllerServicesAsync(processGroup.findAllControllerServices()); final Collection<ControllerServiceNode> controllerServices = processGroup.findAllControllerServices();
final Future<Void> disableServicesFuture = context.getControllerServiceProvider().disableControllerServicesAsync(controllerServices);
notifyScheduledStateChange(controllerServices, synchronizationOptions);
try { try {
disableServicesFuture.get(timeout, TimeUnit.MILLISECONDS); disableServicesFuture.get(timeout, TimeUnit.MILLISECONDS);
} catch (final ExecutionException ee) { } catch (final ExecutionException ee) {
@ -1648,6 +1655,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
// Stop all necessary enabled/active Controller Services // Stop all necessary enabled/active Controller Services
final Future<Void> serviceDisableFuture = context.getControllerServiceProvider().disableControllerServicesAsync(controllerServicesToStop); final Future<Void> serviceDisableFuture = context.getControllerServiceProvider().disableControllerServicesAsync(controllerServicesToStop);
notifyScheduledStateChange(controllerServicesToStop, synchronizationOptions);
try { try {
serviceDisableFuture.get(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS); serviceDisableFuture.get(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -1675,9 +1683,11 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
} finally { } finally {
// Re-enable all Controller Services that we disabled and restart all processors // Re-enable all Controller Services that we disabled and restart all processors
context.getControllerServiceProvider().enableControllerServicesAsync(controllerServicesToStop); context.getControllerServiceProvider().enableControllerServicesAsync(controllerServicesToStop);
notifyScheduledStateChange(controllerServicesToStop, synchronizationOptions);
for (final ProcessorNode processor : processorsToStop) { for (final ProcessorNode processor : processorsToStop) {
processor.getProcessGroup().startProcessor(processor, false); processor.getProcessGroup().startProcessor(processor, false);
notifyScheduledStateChange((ComponentNode) processor,synchronizationOptions);
} }
} }
} finally { } finally {
@ -2109,9 +2119,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
} }
} finally { } finally {
// Restart any components that need to be restarted. // Restart any components that need to be restarted.
for (final Connectable stoppedComponent : toRestart) { startComponents(toRestart, synchronizationOptions);
context.getComponentScheduler().startComponent(stoppedComponent);
}
} }
} }
@ -2236,15 +2244,20 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
} }
} finally { } finally {
// Restart any components that need to be restarted. // Restart any components that need to be restarted.
for (final Connectable stoppedComponent : toRestart) { startComponents(toRestart, synchronizationOptions);
context.getComponentScheduler().startComponent(stoppedComponent);
}
} }
} finally { } finally {
synchronizationOptions.getComponentScheduler().resume(); synchronizationOptions.getComponentScheduler().resume();
} }
} }
private void startComponents(final Collection<Connectable> stoppedComponents, final FlowSynchronizationOptions synchronizationOptions) {
for (final Connectable stoppedComponent : stoppedComponents) {
context.getComponentScheduler().startComponent(stoppedComponent);
notifyScheduledStateChange(stoppedComponent, synchronizationOptions);
}
}
private void updatePort(final Port port, final VersionedPort proposed, final String temporaryName) { private void updatePort(final Port port, final VersionedPort proposed, final String temporaryName) {
final String name = temporaryName != null ? temporaryName : proposed.getName(); final String name = temporaryName != null ? temporaryName : proposed.getName();
port.setComments(proposed.getComments()); port.setComments(proposed.getComments());
@ -2422,9 +2435,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
} }
} finally { } finally {
// Restart any components that need to be restarted. // Restart any components that need to be restarted.
for (final Connectable stoppedComponent : toRestart) { startComponents(toRestart, synchronizationOptions);
context.getComponentScheduler().startComponent(stoppedComponent);
}
} }
} finally { } finally {
synchronizationOptions.getComponentScheduler().resume(); synchronizationOptions.getComponentScheduler().resume();
@ -2476,6 +2487,50 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
return stoppedComponents; return stoppedComponents;
} }
private void notifyScheduledStateChange(final Connectable component, final FlowSynchronizationOptions synchronizationOptions) {
try {
if (component instanceof ProcessorNode) {
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode) component);
} else if (component instanceof Port) {
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port) component);
}
} catch (final Exception e) {
LOG.debug("Failed to notify listeners of ScheduledState changes", e);
}
}
private void notifyScheduledStateChange(final ComponentNode component, final FlowSynchronizationOptions synchronizationOptions) {
try {
if (component instanceof ProcessorNode) {
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode) component);
} else if (component instanceof Port) {
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port) component);
} else if (component instanceof ControllerServiceNode) {
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ControllerServiceNode) component);
} else if (component instanceof ReportingTaskNode) {
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ReportingTaskNode) component);
}
} catch (final Exception e) {
LOG.debug("Failed to notify listeners of ScheduledState changes", e);
}
}
private void notifyScheduledStateChange(final Collection<ControllerServiceNode> servicesToRestart, final FlowSynchronizationOptions synchronizationOptions) {
try {
servicesToRestart.forEach(synchronizationOptions.getScheduledStateChangeListener()::onScheduledStateChange);
} catch (final Exception e) {
LOG.debug("Failed to notify listeners of ScheduledState changes", e);
}
}
private void notifyScheduledStateChange(final Port inputPort, final FlowSynchronizationOptions synchronizationOptions) {
try {
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(inputPort);
} catch (final Exception e) {
LOG.debug("Failed to notify listeners of ScheduledState changes", e);
}
}
private boolean stopOrTerminate(final Connectable component, final long timeout, final FlowSynchronizationOptions synchronizationOptions) throws TimeoutException, FlowSynchronizationException { private boolean stopOrTerminate(final Connectable component, final long timeout, final FlowSynchronizationOptions synchronizationOptions) throws TimeoutException, FlowSynchronizationException {
if (!component.isRunning()) { if (!component.isRunning()) {
return false; return false;
@ -2484,13 +2539,18 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final ConnectableType connectableType = component.getConnectableType(); final ConnectableType connectableType = component.getConnectableType();
switch (connectableType) { switch (connectableType) {
case INPUT_PORT: case INPUT_PORT:
component.getProcessGroup().stopInputPort((Port) component); final Port inputPort = (Port) component;
component.getProcessGroup().stopInputPort(inputPort);
notifyScheduledStateChange(inputPort, synchronizationOptions);
return true; return true;
case OUTPUT_PORT: case OUTPUT_PORT:
component.getProcessGroup().stopOutputPort((Port) component); final Port outputPort = (Port) component;
component.getProcessGroup().stopOutputPort(outputPort);
notifyScheduledStateChange(outputPort, synchronizationOptions);
return true; return true;
case PROCESSOR: case PROCESSOR:
return stopOrTerminate((ProcessorNode) component, timeout, synchronizationOptions); final ProcessorNode processorNode = (ProcessorNode) component;
return stopOrTerminate(processorNode, timeout, synchronizationOptions);
default: default:
return false; return false;
} }
@ -2499,6 +2559,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
private boolean stopOrTerminate(final ProcessorNode processor, final long timeout, final FlowSynchronizationOptions synchronizationOptions) throws TimeoutException, FlowSynchronizationException { private boolean stopOrTerminate(final ProcessorNode processor, final long timeout, final FlowSynchronizationOptions synchronizationOptions) throws TimeoutException, FlowSynchronizationException {
try { try {
LOG.debug("Stopping {} in order to synchronize it with proposed version", processor); LOG.debug("Stopping {} in order to synchronize it with proposed version", processor);
return stopProcessor(processor, timeout); return stopProcessor(processor, timeout);
} catch (final TimeoutException te) { } catch (final TimeoutException te) {
switch (synchronizationOptions.getComponentStopTimeoutAction()) { switch (synchronizationOptions.getComponentStopTimeoutAction()) {
@ -2509,6 +2570,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
processor.terminate(); processor.terminate();
return true; return true;
} }
} finally {
notifyScheduledStateChange((ComponentNode) processor, synchronizationOptions);
} }
} }
@ -2531,7 +2594,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
private void stopControllerService(final ControllerServiceNode controllerService, final VersionedControllerService proposed, final long timeout, private void stopControllerService(final ControllerServiceNode controllerService, final VersionedControllerService proposed, final long timeout,
final FlowSynchronizationOptions.ComponentStopTimeoutAction timeoutAction, final Set<ComponentNode> referencesStopped, final FlowSynchronizationOptions.ComponentStopTimeoutAction timeoutAction, final Set<ComponentNode> referencesStopped,
final Set<ControllerServiceNode> servicesDisabled) throws FlowSynchronizationException, final Set<ControllerServiceNode> servicesDisabled, final FlowSynchronizationOptions synchronizationOptions) throws FlowSynchronizationException,
TimeoutException, InterruptedException { TimeoutException, InterruptedException {
final ControllerServiceProvider serviceProvider = context.getControllerServiceProvider(); final ControllerServiceProvider serviceProvider = context.getControllerServiceProvider();
if (controllerService == null) { if (controllerService == null) {
@ -2546,6 +2609,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final Future<Void> future = entry.getValue(); final Future<Void> future = entry.getValue();
waitForStopCompletion(future, component, timeout, timeoutAction); waitForStopCompletion(future, component, timeout, timeoutAction);
notifyScheduledStateChange(component, synchronizationOptions);
} }
if (controllerService.isActive()) { if (controllerService.isActive()) {
@ -2569,6 +2633,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
// Disable the service and wait for completion, up to the timeout allowed // Disable the service and wait for completion, up to the timeout allowed
final Future<Void> future = serviceProvider.disableControllerServicesAsync(servicesToStop); final Future<Void> future = serviceProvider.disableControllerServicesAsync(servicesToStop);
waitForStopCompletion(future, controllerService, timeout, timeoutAction); waitForStopCompletion(future, controllerService, timeout, timeoutAction);
notifyScheduledStateChange(servicesToStop, synchronizationOptions);
} }
} }
@ -2663,13 +2728,18 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
synchronizationOptions.getComponentScheduler().pause(); synchronizationOptions.getComponentScheduler().pause();
try { try {
// Stop the processor, if necessary, in order to update it. // Stop the rpg, if necessary, in order to update it.
final Set<Connectable> toRestart = new HashSet<>(); final Set<Connectable> toRestart = new HashSet<>();
if (rpg != null) { if (rpg != null) {
if (rpg.isTransmitting()) { if (rpg.isTransmitting()) {
final Set<RemoteGroupPort> transmitting = getTransmittingPorts(rpg); final Set<RemoteGroupPort> transmitting = getTransmittingPorts(rpg);
final Future<?> future = rpg.stopTransmitting(); final Future<?> future = rpg.stopTransmitting();
try {
transmitting.forEach(synchronizationOptions.getScheduledStateChangeListener()::onScheduledStateChange);
} catch (final Exception e) {
LOG.debug("Failed to notify listeners of ScheduledState changes", e);
}
waitForStopCompletion(future, rpg, timeout, synchronizationOptions.getComponentStopTimeoutAction()); waitForStopCompletion(future, rpg, timeout, synchronizationOptions.getComponentStopTimeoutAction());
final boolean proposedTransmitting = isTransmitting(proposed); final boolean proposedTransmitting = isTransmitting(proposed);
@ -2708,9 +2778,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
throw new FlowSynchronizationException("Failed to synchronize " + rpg + " with proposed version", e); throw new FlowSynchronizationException("Failed to synchronize " + rpg + " with proposed version", e);
} finally { } finally {
// Restart any components that need to be restarted. // Restart any components that need to be restarted.
for (final Connectable stoppedComponent : toRestart) { startComponents(toRestart, synchronizationOptions);
context.getComponentScheduler().startComponent(stoppedComponent);
}
} }
} finally { } finally {
synchronizationOptions.getComponentScheduler().resume(); synchronizationOptions.getComponentScheduler().resume();
@ -2966,7 +3034,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
} }
LOG.info("Components upstream of {} did not stop in time. Will terminate {}", connection, upstream); LOG.info("Components upstream of {} did not stop in time. Will terminate {}", connection, upstream);
terminateComponents(upstream); terminateComponents(upstream, synchronizationOptions);
stoppedComponents = upstream; stoppedComponents = upstream;
} }
@ -3006,9 +3074,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
} finally { } finally {
// If not removing the connection, restart any component that we stopped. // If not removing the connection, restart any component that we stopped.
if (proposedConnection != null) { if (proposedConnection != null) {
for (final Connectable component : stoppedComponents) { startComponents(stoppedComponents, synchronizationOptions);
context.getComponentScheduler().startComponent(component);
}
} }
} }
} }
@ -3030,7 +3096,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
} }
} }
private void terminateComponents(final Set<Connectable> components) { private void terminateComponents(final Set<Connectable> components, final FlowSynchronizationOptions synchronizationOptions) {
for (final Connectable component : components) { for (final Connectable component : components) {
if (!(component instanceof ProcessorNode)) { if (!(component instanceof ProcessorNode)) {
continue; continue;
@ -3043,6 +3109,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
processor.getProcessGroup().stopProcessor(processor); processor.getProcessGroup().stopProcessor(processor);
processor.terminate(); processor.terminate();
notifyScheduledStateChange((ComponentNode) processor, synchronizationOptions);
} }
} }

View File

@ -26,6 +26,7 @@ import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReloadComponent; import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.queue.LoadBalanceStrategy;
@ -48,6 +49,7 @@ import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.groups.ComponentIdGenerator; import org.apache.nifi.groups.ComponentIdGenerator;
import org.apache.nifi.groups.ComponentScheduler; import org.apache.nifi.groups.ComponentScheduler;
import org.apache.nifi.groups.ScheduledStateChangeListener;
import org.apache.nifi.groups.FlowSynchronizationOptions; import org.apache.nifi.groups.FlowSynchronizationOptions;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogLevel; import org.apache.nifi.logging.LogLevel;
@ -77,6 +79,7 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -126,6 +129,7 @@ public class StandardVersionedComponentSynchronizerTest {
private ControllerServiceProvider controllerServiceProvider; private ControllerServiceProvider controllerServiceProvider;
private ParameterContextManager parameterContextManager; private ParameterContextManager parameterContextManager;
private ParameterReferenceManager parameterReferenceManager; private ParameterReferenceManager parameterReferenceManager;
private CapturingScheduledStateChangeListener scheduledStateChangeListener;
private final Set<String> queuesWithData = Collections.synchronizedSet(new HashSet<>()); private final Set<String> queuesWithData = Collections.synchronizedSet(new HashSet<>());
private final Bundle bundle = new Bundle("group", "artifact", "version 1.0"); private final Bundle bundle = new Bundle("group", "artifact", "version 1.0");
@ -197,10 +201,13 @@ public class StandardVersionedComponentSynchronizerTest {
when(group.getInputPorts()).thenReturn(Collections.singleton(inputPort)); when(group.getInputPorts()).thenReturn(Collections.singleton(inputPort));
when(group.getOutputPorts()).thenReturn(Collections.singleton(outputPort)); when(group.getOutputPorts()).thenReturn(Collections.singleton(outputPort));
scheduledStateChangeListener = new CapturingScheduledStateChangeListener();
synchronizationOptions = new FlowSynchronizationOptions.Builder() synchronizationOptions = new FlowSynchronizationOptions.Builder()
.componentIdGenerator(componentIdGenerator) .componentIdGenerator(componentIdGenerator)
.componentComparisonIdLookup(VersionedComponent::getIdentifier) .componentComparisonIdLookup(VersionedComponent::getIdentifier)
.componentScheduler(componentScheduler) .componentScheduler(componentScheduler)
.scheduledStateChangeListener(scheduledStateChangeListener)
.build(); .build();
synchronizer = new StandardVersionedComponentSynchronizer(context); synchronizer = new StandardVersionedComponentSynchronizer(context);
@ -213,6 +220,7 @@ public class StandardVersionedComponentSynchronizerTest {
.componentIdGenerator(componentIdGenerator) .componentIdGenerator(componentIdGenerator)
.componentComparisonIdLookup(VersionedComponent::getIdentifier) .componentComparisonIdLookup(VersionedComponent::getIdentifier)
.componentScheduler(componentScheduler) .componentScheduler(componentScheduler)
.scheduledStateChangeListener(scheduledStateChangeListener)
.componentStopTimeout(Duration.ofMillis(10)) .componentStopTimeout(Duration.ofMillis(10))
.componentStopTimeoutAction(timeoutAction) .componentStopTimeoutAction(timeoutAction)
.build(); .build();
@ -382,6 +390,8 @@ public class StandardVersionedComponentSynchronizerTest {
verify(connectionAB, times(1)).setName("Hello"); verify(connectionAB, times(1)).setName("Hello");
verify(connectionAB, times(1)).setRelationships(Collections.singleton(new Relationship.Builder().name("success").build())); verify(connectionAB, times(1)).setRelationships(Collections.singleton(new Relationship.Builder().name("success").build()));
scheduledStateChangeListener.assertNumProcessorUpdates(0);
} }
@Test @Test
@ -399,6 +409,8 @@ public class StandardVersionedComponentSynchronizerTest {
// Ensure that the source was stopped and restarted // Ensure that the source was stopped and restarted
verifyStopped(processorA); verifyStopped(processorA);
verifyRestarted(processorA); verifyRestarted(processorA);
verifyCallbackIndicatedRestart(processorA);
} }
@Test @Test
@ -420,6 +432,8 @@ public class StandardVersionedComponentSynchronizerTest {
// Ensure that the source was stopped and restarted // Ensure that the source was stopped and restarted
verifyStopped(processorA); verifyStopped(processorA);
verifyRestarted(processorA); verifyRestarted(processorA);
verifyCallbackIndicatedRestart(processorA);
} }
@Test @Test
@ -444,6 +458,29 @@ public class StandardVersionedComponentSynchronizerTest {
// Ensure that the source was stopped and restarted // Ensure that the source was stopped and restarted
verifyStopped(processorA); verifyStopped(processorA);
verifyNotRestarted(processorA); verifyNotRestarted(processorA);
verifyCallbackIndicatedStopOnly(processorA);
}
private void verifyCallbackIndicatedRestart(final ProcessorNode... processors) {
for (final ProcessorNode processor : processors) {
scheduledStateChangeListener.assertProcessorUpdates(new ScheduledStateUpdate<>(processor, org.apache.nifi.controller.ScheduledState.STOPPED),
new ScheduledStateUpdate<>(processor, org.apache.nifi.controller.ScheduledState.RUNNING));
}
scheduledStateChangeListener.assertNumProcessorUpdates(processors.length * 2);
}
private void verifyCallbackIndicatedStopOnly(final ProcessorNode... processors) {
for (final ProcessorNode processor : processors) {
scheduledStateChangeListener.assertProcessorUpdates(new ScheduledStateUpdate<>(processor, org.apache.nifi.controller.ScheduledState.STOPPED));
}
scheduledStateChangeListener.assertNumProcessorUpdates(processors.length);
}
private void verifyCallbackIndicatedStartOnly(final ProcessorNode... processors) {
for (final ProcessorNode processor : processors) {
scheduledStateChangeListener.assertProcessorUpdates(new ScheduledStateUpdate<>(processor, org.apache.nifi.controller.ScheduledState.RUNNING));
}
scheduledStateChangeListener.assertNumProcessorUpdates(processors.length);
} }
@Test @Test
@ -456,6 +493,7 @@ public class StandardVersionedComponentSynchronizerTest {
verifyStopped(processorA); verifyStopped(processorA);
verifyNotRestarted(processorA); verifyNotRestarted(processorA);
verify(group).removeConnection(connectionAB); verify(group).removeConnection(connectionAB);
verifyCallbackIndicatedStopOnly(processorA);
} }
@Test @Test
@ -477,6 +515,7 @@ public class StandardVersionedComponentSynchronizerTest {
// can be removed. // can be removed.
verifyStopped(processorA); verifyStopped(processorA);
verifyNotRestarted(processorA); verifyNotRestarted(processorA);
verifyCallbackIndicatedStopOnly(processorA);
} }
@Test @Test
@ -513,6 +552,7 @@ public class StandardVersionedComponentSynchronizerTest {
// Ensure that the source was stopped, destination was stopped, and the connection was removed. // Ensure that the source was stopped, destination was stopped, and the connection was removed.
verifyStopped(processorA); verifyStopped(processorA);
verifyNotRestarted(processorA); verifyNotRestarted(processorA);
verifyCallbackIndicatedStopOnly(processorB, processorA);
verifyStopped(processorB); verifyStopped(processorB);
verifyNotRestarted(processorB); verifyNotRestarted(processorB);
verify(group, times(1)).removeConnection(connectionAB); verify(group, times(1)).removeConnection(connectionAB);
@ -1032,4 +1072,73 @@ public class StandardVersionedComponentSynchronizerTest {
return versionedPort; return versionedPort;
} }
private class ScheduledStateUpdate<T> {
private T component;
private org.apache.nifi.controller.ScheduledState state;
public ScheduledStateUpdate(T component, org.apache.nifi.controller.ScheduledState state) {
this.component = component;
this.state = state;
}
}
private class ControllerServiceStateUpdate {
private ControllerServiceNode controllerService;
private ControllerServiceState state;
public ControllerServiceStateUpdate(ControllerServiceNode controllerService, ControllerServiceState state) {
this.controllerService = controllerService;
this.state = state;
}
}
private class CapturingScheduledStateChangeListener implements ScheduledStateChangeListener {
private List<ScheduledStateUpdate<ProcessorNode>> processorUpdates = new ArrayList<>();
private List<ScheduledStateUpdate<Port>> portUpdates = new ArrayList<>();
private List<ControllerServiceStateUpdate> serviceUpdates = new ArrayList<>();
private List<ScheduledStateUpdate<ReportingTaskNode>> reportingTaskUpdates = new ArrayList<>();
@Override
public void onScheduledStateChange(final ProcessorNode processor) {
processorUpdates.add(new ScheduledStateUpdate<>(processor, processor.getScheduledState()));
}
@Override
public void onScheduledStateChange(ControllerServiceNode controllerService) {
serviceUpdates.add(new ControllerServiceStateUpdate(controllerService, controllerService.getState()));
}
@Override
public void onScheduledStateChange(ReportingTaskNode reportingTask) {
reportingTaskUpdates.add(new ScheduledStateUpdate<>(reportingTask, reportingTask.getScheduledState()));
}
@Override
public void onScheduledStateChange(final Port port) {
portUpdates.add(new ScheduledStateUpdate<>(port, port.getScheduledState()));
}
void assertNumProcessorUpdates(int expectedNum) {
assertEquals("Expected " + expectedNum + " processor state changes", expectedNum, processorUpdates.size());
}
void assertProcessorUpdates(final ScheduledStateUpdate<ProcessorNode>... updates) {
final Iterator<ScheduledStateUpdate<ProcessorNode>> it = processorUpdates.iterator();
for (final ScheduledStateUpdate<ProcessorNode> expectedUpdate : updates) {
final ScheduledStateUpdate<ProcessorNode> capturedUpdate = it.next();
assertEquals(expectedUpdate.component.getName(), capturedUpdate.component.getName());
if (expectedUpdate.state == org.apache.nifi.controller.ScheduledState.RUNNING) {
verifyRestarted(capturedUpdate.component);
} else if (expectedUpdate.state == org.apache.nifi.controller.ScheduledState.STOPPED) {
verifyStopped(capturedUpdate.component);
}
}
}
void assertNumPortUpdates(int expectedNum) {
assertEquals("Expected " + expectedNum + " port state changes", expectedNum, portUpdates.size());
}
}
} }

View File

@ -35,6 +35,7 @@ public class FlowSynchronizationOptions {
private final boolean updateRpgUrls; private final boolean updateRpgUrls;
private final Duration componentStopTimeout; private final Duration componentStopTimeout;
private final ComponentStopTimeoutAction timeoutAction; private final ComponentStopTimeoutAction timeoutAction;
private final ScheduledStateChangeListener scheduledStateChangeListener;
private FlowSynchronizationOptions(final Builder builder) { private FlowSynchronizationOptions(final Builder builder) {
this.componentIdGenerator = builder.componentIdGenerator; this.componentIdGenerator = builder.componentIdGenerator;
@ -49,6 +50,7 @@ public class FlowSynchronizationOptions {
this.updateRpgUrls = builder.updateRpgUrls; this.updateRpgUrls = builder.updateRpgUrls;
this.componentStopTimeout = builder.componentStopTimeout; this.componentStopTimeout = builder.componentStopTimeout;
this.timeoutAction = builder.timeoutAction; this.timeoutAction = builder.timeoutAction;
this.scheduledStateChangeListener = builder.scheduledStateChangeListener;
} }
public ComponentIdGenerator getComponentIdGenerator() { public ComponentIdGenerator getComponentIdGenerator() {
@ -99,6 +101,10 @@ public class FlowSynchronizationOptions {
return timeoutAction; return timeoutAction;
} }
public ScheduledStateChangeListener getScheduledStateChangeListener() {
return scheduledStateChangeListener;
}
public static class Builder { public static class Builder {
private ComponentIdGenerator componentIdGenerator; private ComponentIdGenerator componentIdGenerator;
private Function<VersionedComponent, String> componentComparisonIdLookup; private Function<VersionedComponent, String> componentComparisonIdLookup;
@ -109,6 +115,7 @@ public class FlowSynchronizationOptions {
private boolean updateGroupVersionControlSnapshot = true; private boolean updateGroupVersionControlSnapshot = true;
private boolean updateExistingVariables = false; private boolean updateExistingVariables = false;
private boolean updateRpgUrls = false; private boolean updateRpgUrls = false;
private ScheduledStateChangeListener scheduledStateChangeListener;
private PropertyDecryptor propertyDecryptor = value -> value; private PropertyDecryptor propertyDecryptor = value -> value;
private Duration componentStopTimeout = Duration.ofSeconds(30); private Duration componentStopTimeout = Duration.ofSeconds(30);
private ComponentStopTimeoutAction timeoutAction = ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION; private ComponentStopTimeoutAction timeoutAction = ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION;
@ -247,6 +254,15 @@ public class FlowSynchronizationOptions {
return this; return this;
} }
/**
* Specifies a callback whose methods will be called when component scheduled states are updated by the synchronizer
* @param listener the ScheduledStateChangeListener to use
* @return the builder
*/
public Builder scheduledStateChangeListener(final ScheduledStateChangeListener listener) {
this.scheduledStateChangeListener = listener;
return this;
}
public FlowSynchronizationOptions build() { public FlowSynchronizationOptions build() {
if (componentIdGenerator == null) { if (componentIdGenerator == null) {
@ -258,6 +274,9 @@ public class FlowSynchronizationOptions {
if (componentScheduler == null) { if (componentScheduler == null) {
throw new IllegalStateException("Must set Component Scheduler"); throw new IllegalStateException("Must set Component Scheduler");
} }
if (scheduledStateChangeListener == null) {
scheduledStateChangeListener = ScheduledStateChangeListener.EMPTY;
}
return new FlowSynchronizationOptions(this); return new FlowSynchronizationOptions(this);
} }
@ -276,6 +295,7 @@ public class FlowSynchronizationOptions {
builder.propertyDecryptor = options.getPropertyDecryptor(); builder.propertyDecryptor = options.getPropertyDecryptor();
builder.componentStopTimeout = options.getComponentStopTimeout(); builder.componentStopTimeout = options.getComponentStopTimeout();
builder.timeoutAction = options.getComponentStopTimeoutAction(); builder.timeoutAction = options.getComponentStopTimeoutAction();
builder.scheduledStateChangeListener = options.getScheduledStateChangeListener();
return builder; return builder;
} }

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.groups;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
public interface ScheduledStateChangeListener {
void onScheduledStateChange(ProcessorNode processor);
void onScheduledStateChange(Port port);
void onScheduledStateChange(ControllerServiceNode controllerService);
void onScheduledStateChange(ReportingTaskNode reportingTask);
ScheduledStateChangeListener EMPTY = new ScheduledStateChangeListener() {
@Override
public void onScheduledStateChange(ProcessorNode processor) {
}
@Override
public void onScheduledStateChange(Port port) {
}
@Override
public void onScheduledStateChange(ControllerServiceNode controllerService) {
}
@Override
public void onScheduledStateChange(ReportingTaskNode reportingTask) {
}
};
}