NIFI-10518: Adding intended state to ScheduledStateChangeListener (#6428)

- NIFI-10518: Adding intended state to ScheduledStateChangeListener
- Notifying of scheduled state change when transitionComponentState is called
- Notifying scheduled state change when reporting task state is changed
- Notifying scheduledState changes for remote group port start/stop components calls
This commit is contained in:
Joe Gresock 2022-09-19 10:38:00 -04:00 committed by GitHub
parent 35fb66f50f
commit 43931b1504
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 39 deletions

View File

@ -35,6 +35,7 @@ import org.apache.nifi.controller.PropertyConfiguration;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.Triggerable;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.FlowFileQueue;
@ -1164,12 +1165,12 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
if (proposed != null && proposed.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) {
// Re-enable the controller service if necessary
serviceProvider.enableControllerServicesAsync(servicesToRestart);
notifyScheduledStateChange(servicesToRestart, synchronizationOptions);
notifyScheduledStateChange(servicesToRestart, synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
// Restart any components that need to be restarted.
if (controllerService != null) {
serviceProvider.scheduleReferencingComponents(controllerService, referencesToRestart, context.getComponentScheduler());
referencesToRestart.forEach(componentNode -> notifyScheduledStateChange(componentNode, synchronizationOptions));
referencesToRestart.forEach(componentNode -> notifyScheduledStateChange(componentNode, synchronizationOptions, org.apache.nifi.flow.ScheduledState.RUNNING));
}
}
}
@ -1499,7 +1500,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
for (final ComponentNode stoppedComponent : componentsToRestart) {
if (stoppedComponent instanceof Connectable) {
context.getComponentScheduler().startComponent((Connectable) stoppedComponent);
notifyScheduledStateChange(stoppedComponent, synchronizationOptions);
notifyScheduledStateChange(stoppedComponent, synchronizationOptions, org.apache.nifi.flow.ScheduledState.RUNNING);
}
}
}
@ -1555,7 +1556,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
// Disable all Controller Services
final Collection<ControllerServiceNode> controllerServices = processGroup.findAllControllerServices();
final Future<Void> disableServicesFuture = context.getControllerServiceProvider().disableControllerServicesAsync(controllerServices);
notifyScheduledStateChange(controllerServices, synchronizationOptions);
notifyScheduledStateChange(controllerServices, synchronizationOptions, org.apache.nifi.flow.ScheduledState.DISABLED);
try {
disableServicesFuture.get(timeout, TimeUnit.MILLISECONDS);
} catch (final ExecutionException ee) {
@ -1658,7 +1659,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
// Stop all necessary enabled/active Controller Services
final Future<Void> serviceDisableFuture = context.getControllerServiceProvider().disableControllerServicesAsync(controllerServicesToStop);
notifyScheduledStateChange(controllerServicesToStop, synchronizationOptions);
notifyScheduledStateChange(controllerServicesToStop, synchronizationOptions, org.apache.nifi.flow.ScheduledState.DISABLED);
try {
serviceDisableFuture.get(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
@ -1686,11 +1687,11 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
} finally {
// Re-enable all Controller Services that we disabled and restart all processors
context.getControllerServiceProvider().enableControllerServicesAsync(controllerServicesToStop);
notifyScheduledStateChange(controllerServicesToStop, synchronizationOptions);
notifyScheduledStateChange(controllerServicesToStop, synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
for (final ProcessorNode processor : processorsToStop) {
processor.getProcessGroup().startProcessor(processor, false);
notifyScheduledStateChange((ComponentNode) processor,synchronizationOptions);
notifyScheduledStateChange((ComponentNode) processor,synchronizationOptions, org.apache.nifi.flow.ScheduledState.RUNNING);
}
}
} finally {
@ -2257,7 +2258,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
private void startComponents(final Collection<Connectable> stoppedComponents, final FlowSynchronizationOptions synchronizationOptions) {
for (final Connectable stoppedComponent : stoppedComponents) {
context.getComponentScheduler().startComponent(stoppedComponent);
notifyScheduledStateChange(stoppedComponent, synchronizationOptions);
notifyScheduledStateChange(stoppedComponent, synchronizationOptions, org.apache.nifi.flow.ScheduledState.RUNNING);
}
}
@ -2269,6 +2270,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
port.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
context.getComponentScheduler().transitionComponentState(port, proposed.getScheduledState());
notifyScheduledStateChange(port, syncOptions, proposed.getScheduledState());
}
private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final ComponentIdGenerator componentIdGenerator, final String temporaryName) {
@ -2490,45 +2492,62 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
return stoppedComponents;
}
private void notifyScheduledStateChange(final Connectable component, final FlowSynchronizationOptions synchronizationOptions) {
private void notifyScheduledStateChange(final Connectable component, final FlowSynchronizationOptions synchronizationOptions, final org.apache.nifi.flow.ScheduledState intendedState) {
try {
if (component instanceof ProcessorNode) {
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode) component);
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode) component, intendedState);
} else if (component instanceof Port) {
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port) component);
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port) component, intendedState);
}
} catch (final Exception e) {
LOG.debug("Failed to notify listeners of ScheduledState changes", e);
}
}
private void notifyScheduledStateChange(final ComponentNode component, final FlowSynchronizationOptions synchronizationOptions) {
private void notifyScheduledStateChange(final ComponentNode component, final FlowSynchronizationOptions synchronizationOptions, final org.apache.nifi.flow.ScheduledState intendedState) {
if (component instanceof Triggerable && intendedState == org.apache.nifi.flow.ScheduledState.RUNNING && ((Triggerable) component).getScheduledState() == ScheduledState.DISABLED) {
return;
}
try {
if (component instanceof ProcessorNode) {
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode) component);
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ProcessorNode) component, intendedState);
} else if (component instanceof Port) {
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port) component);
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((Port) component, intendedState);
} else if (component instanceof ControllerServiceNode) {
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ControllerServiceNode) component);
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ControllerServiceNode) component, intendedState);
} else if (component instanceof ReportingTaskNode) {
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange((ReportingTaskNode) component);
final ReportingTaskNode reportingTaskNode = (ReportingTaskNode) component;
if (intendedState == org.apache.nifi.flow.ScheduledState.RUNNING && reportingTaskNode.getScheduledState() == ScheduledState.DISABLED) {
return;
}
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(reportingTaskNode, intendedState);
}
} catch (final Exception e) {
LOG.debug("Failed to notify listeners of ScheduledState changes", e);
}
}
private void notifyScheduledStateChange(final Collection<ControllerServiceNode> servicesToRestart, final FlowSynchronizationOptions synchronizationOptions) {
private void notifyScheduledStateChange(final Collection<ControllerServiceNode> servicesToRestart, final FlowSynchronizationOptions synchronizationOptions,
final org.apache.nifi.flow.ScheduledState intendedState) {
try {
servicesToRestart.forEach(synchronizationOptions.getScheduledStateChangeListener()::onScheduledStateChange);
servicesToRestart.forEach(service -> {
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(service, intendedState);
if (intendedState == org.apache.nifi.flow.ScheduledState.DISABLED) {
service.getReferences().findRecursiveReferences(ControllerServiceNode.class)
.forEach(reference -> synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(reference, org.apache.nifi.flow.ScheduledState.DISABLED));
} else if (intendedState == org.apache.nifi.flow.ScheduledState.ENABLED) {
service.getRequiredControllerServices().forEach(requiredService -> synchronizationOptions.getScheduledStateChangeListener()
.onScheduledStateChange(requiredService, org.apache.nifi.flow.ScheduledState.ENABLED));
}
});
} catch (final Exception e) {
LOG.debug("Failed to notify listeners of ScheduledState changes", e);
}
}
private void notifyScheduledStateChange(final Port inputPort, final FlowSynchronizationOptions synchronizationOptions) {
private void notifyScheduledStateChange(final Port inputPort, final FlowSynchronizationOptions synchronizationOptions, final org.apache.nifi.flow.ScheduledState intendedState) {
try {
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(inputPort);
synchronizationOptions.getScheduledStateChangeListener().onScheduledStateChange(inputPort, intendedState);
} catch (final Exception e) {
LOG.debug("Failed to notify listeners of ScheduledState changes", e);
}
@ -2544,12 +2563,12 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
case INPUT_PORT:
final Port inputPort = (Port) component;
component.getProcessGroup().stopInputPort(inputPort);
notifyScheduledStateChange(inputPort, synchronizationOptions);
notifyScheduledStateChange(inputPort, synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
return true;
case OUTPUT_PORT:
final Port outputPort = (Port) component;
component.getProcessGroup().stopOutputPort(outputPort);
notifyScheduledStateChange(outputPort, synchronizationOptions);
notifyScheduledStateChange(outputPort, synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
return true;
case PROCESSOR:
final ProcessorNode processorNode = (ProcessorNode) component;
@ -2574,7 +2593,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
return true;
}
} finally {
notifyScheduledStateChange((ComponentNode) processor, synchronizationOptions);
notifyScheduledStateChange((ComponentNode) processor, synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
}
}
@ -2612,7 +2631,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final Future<Void> future = entry.getValue();
waitForStopCompletion(future, component, timeout, timeoutAction);
notifyScheduledStateChange(component, synchronizationOptions);
notifyScheduledStateChange(component, synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
}
if (controllerService.isActive()) {
@ -2636,7 +2655,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
// Disable the service and wait for completion, up to the timeout allowed
final Future<Void> future = serviceProvider.disableControllerServicesAsync(servicesToStop);
waitForStopCompletion(future, controllerService, timeout, timeoutAction);
notifyScheduledStateChange(servicesToStop, synchronizationOptions);
notifyScheduledStateChange(servicesToStop, synchronizationOptions, org.apache.nifi.flow.ScheduledState.DISABLED);
}
}
@ -2689,6 +2708,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
// Transition state to disabled/enabled/running
context.getComponentScheduler().transitionComponentState(processor, proposed.getScheduledState());
notifyScheduledStateChange((ComponentNode) processor, syncOptions, proposed.getScheduledState());
if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) {
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
@ -2739,7 +2759,8 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final Future<?> future = rpg.stopTransmitting();
try {
transmitting.forEach(synchronizationOptions.getScheduledStateChangeListener()::onScheduledStateChange);
transmitting.forEach(remoteGroupPort -> synchronizationOptions.getScheduledStateChangeListener()
.onScheduledStateChange(remoteGroupPort, org.apache.nifi.flow.ScheduledState.ENABLED));
} catch (final Exception e) {
LOG.debug("Failed to notify listeners of ScheduledState changes", e);
}
@ -2947,10 +2968,12 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
if (versionedPort.getScheduledState() == org.apache.nifi.flow.ScheduledState.RUNNING) {
if (portState != ScheduledState.RUNNING) {
context.getComponentScheduler().startComponent(remoteGroupPort);
notifyScheduledStateChange(remoteGroupPort, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING);
}
} else {
if (portState == ScheduledState.RUNNING) {
context.getComponentScheduler().stopComponent(remoteGroupPort);
notifyScheduledStateChange(remoteGroupPort, syncOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
}
}
}
@ -3112,7 +3135,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
processor.getProcessGroup().stopProcessor(processor);
processor.terminate();
notifyScheduledStateChange((ComponentNode) processor, synchronizationOptions);
notifyScheduledStateChange((ComponentNode) processor, synchronizationOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
}
}
@ -3419,6 +3442,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}
break;
}
notifyScheduledStateChange(reportingTask, syncOptions, proposed.getScheduledState());
} finally {
reportingTask.resumeValidationTrigger();
}

View File

@ -1137,22 +1137,22 @@ public class StandardVersionedComponentSynchronizerTest {
private List<ScheduledStateUpdate<ReportingTaskNode>> reportingTaskUpdates = new ArrayList<>();
@Override
public void onScheduledStateChange(final ProcessorNode processor) {
public void onScheduledStateChange(final ProcessorNode processor, final ScheduledState intendedState) {
processorUpdates.add(new ScheduledStateUpdate<>(processor, processor.getScheduledState()));
}
@Override
public void onScheduledStateChange(ControllerServiceNode controllerService) {
public void onScheduledStateChange(ControllerServiceNode controllerService, final ScheduledState intendedState) {
serviceUpdates.add(new ControllerServiceStateUpdate(controllerService, controllerService.getState()));
}
@Override
public void onScheduledStateChange(ReportingTaskNode reportingTask) {
public void onScheduledStateChange(ReportingTaskNode reportingTask, final ScheduledState intendedState) {
reportingTaskUpdates.add(new ScheduledStateUpdate<>(reportingTask, reportingTask.getScheduledState()));
}
@Override
public void onScheduledStateChange(final Port port) {
public void onScheduledStateChange(final Port port, final ScheduledState intendedState) {
portUpdates.add(new ScheduledStateUpdate<>(port, port.getScheduledState()));
}

View File

@ -21,34 +21,35 @@ import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.ScheduledState;
public interface ScheduledStateChangeListener {
void onScheduledStateChange(ProcessorNode processor);
void onScheduledStateChange(ProcessorNode processor, ScheduledState intendedState);
void onScheduledStateChange(Port port);
void onScheduledStateChange(Port port, ScheduledState intendedState);
void onScheduledStateChange(ControllerServiceNode controllerService);
void onScheduledStateChange(ControllerServiceNode controllerService, ScheduledState intendedState);
void onScheduledStateChange(ReportingTaskNode reportingTask);
void onScheduledStateChange(ReportingTaskNode reportingTask, ScheduledState intendedState);
ScheduledStateChangeListener EMPTY = new ScheduledStateChangeListener() {
@Override
public void onScheduledStateChange(ProcessorNode processor) {
public void onScheduledStateChange(ProcessorNode processor, ScheduledState intendedState) {
}
@Override
public void onScheduledStateChange(Port port) {
public void onScheduledStateChange(Port port, ScheduledState intendedState) {
}
@Override
public void onScheduledStateChange(ControllerServiceNode controllerService) {
public void onScheduledStateChange(ControllerServiceNode controllerService, ScheduledState intendedState) {
}
@Override
public void onScheduledStateChange(ReportingTaskNode reportingTask) {
public void onScheduledStateChange(ReportingTaskNode reportingTask, ScheduledState intendedState) {
}
};