NIFI-11563: Allowing source connectables to be restarted on new connections in the StandardVersionedComponentSynchronizer (#7261)

This commit is contained in:
Joe Gresock 2023-05-19 08:24:13 -04:00 committed by GitHub
parent 6ee4632267
commit 28f003d886
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 2 deletions

View File

@ -50,6 +50,7 @@ import org.apache.nifi.flow.BatchSize;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.ComponentType;
import org.apache.nifi.flow.ConnectableComponent;
import org.apache.nifi.flow.ConnectableComponentType;
import org.apache.nifi.flow.ParameterProviderReference;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedConnection;
@ -3194,6 +3195,26 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}
}
private Set<Connectable> getUpstreamComponents(final VersionedConnection connection) {
if (connection == null) {
return Collections.emptySet();
}
final Set<Connectable> components = new HashSet<>();
findUpstreamComponents(connection, components);
return components;
}
private void findUpstreamComponents(final VersionedConnection connection, final Set<Connectable> components) {
final ConnectableComponent sourceConnectable = connection.getSource();
final Connectable source = context.getFlowManager().findConnectable(sourceConnectable.getId());
if (sourceConnectable.getType() == ConnectableComponentType.FUNNEL) {
source.getIncomingConnections().forEach(incoming -> findUpstreamComponents(incoming, components));
} else {
components.add(source);
}
}
@Override
public void synchronize(final Connection connection, final VersionedConnection proposedConnection, final ProcessGroup group, final FlowSynchronizationOptions synchronizationOptions)
throws FlowSynchronizationException, TimeoutException {
@ -3205,7 +3226,10 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final long timeout = System.currentTimeMillis() + synchronizationOptions.getComponentStopTimeout().toMillis();
// Stop any upstream components so that we can update the connection
final Set<Connectable> upstream = getUpstreamComponents(connection);
final Set<Connectable> upstream = new HashSet<>(getUpstreamComponents(connection));
if (connection == null) {
upstream.addAll(getUpstreamComponents(proposedConnection));
}
Set<Connectable> stoppedComponents;
try {
stoppedComponents = stopOrTerminate(upstream, timeout, synchronizationOptions);

View File

@ -140,6 +140,7 @@ public class StandardVersionedComponentSynchronizerTest {
private CapturingScheduledStateChangeListener scheduledStateChangeListener;
private ControllerServiceNode controllerServiceNode;
private BundleCoordinate bundleCoordinate;
private FlowManager flowManager;
private final Set<String> queuesWithData = Collections.synchronizedSet(new HashSet<>());
private final Bundle bundle = new Bundle("group", "artifact", "version 1.0");
@ -147,7 +148,7 @@ public class StandardVersionedComponentSynchronizerTest {
@BeforeEach
public void setup() {
final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class);
final FlowManager flowManager = Mockito.mock(FlowManager.class);
flowManager = Mockito.mock(FlowManager.class);
controllerServiceProvider = Mockito.mock(ControllerServiceProvider.class);
final Function<ProcessorNode, ProcessContext> processContextFactory = proc -> Mockito.mock(ProcessContext.class);
final ReloadComponent reloadComponent = Mockito.mock(ReloadComponent.class);
@ -459,6 +460,31 @@ public class StandardVersionedComponentSynchronizerTest {
verifyCallbackIndicatedRestart(processorA);
}
@Test
public void testSourceStoppedForNewConnection() throws FlowSynchronizationException, TimeoutException {
when(flowManager.findConnectable(processorA.getIdentifier())).thenReturn(processorA);
startProcessor(processorA, true);
final VersionedConnection versionedConnection = createMinimalVersionedConnection(processorA, processorB);
versionedConnection.setName("Hello");
synchronizer.synchronize(connectionAB, versionedConnection, group, synchronizationOptions);
// Ensure that we terminate the source
verify(group, times(1)).stopProcessor(processorA);
// Ensure that the update occurred
verify(connectionAB, times(1)).setName("Hello");
// Ensure that the source was stopped and restarted
verifyStopped(processorA);
verifyRestarted(processorA);
verifyCallbackIndicatedRestart(processorA);
}
@Test
public void testTimeoutStoppingSource() {
startProcessor(processorA, false);