From eaa275a5e404d10358adf495802dc2bf829289b5 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 16 Sep 2022 13:45:56 -0400 Subject: [PATCH] NIFI-10515: If propopsed controller service is disabled, ensure that we don't re-enable the service and references when synchronize(ControllerServiceNode...) is called Signed-off-by: Joe Gresock This closes #6425. --- ...tandardVersionedComponentSynchronizer.java | 21 +++++----- ...ardVersionedComponentSynchronizerTest.java | 38 ++++++++++++++++++- 2 files changed, 49 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index 4d23cc6fa1..47d4077bcb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -80,10 +80,9 @@ import org.apache.nifi.parameter.Parameter; import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.parameter.ParameterContextManager; import org.apache.nifi.parameter.ParameterDescriptor; -import org.apache.nifi.parameter.ParameterReferenceManager; -import org.apache.nifi.parameter.ParameterReferencedControllerServiceData; import org.apache.nifi.parameter.ParameterProviderConfiguration; import org.apache.nifi.parameter.ParameterReferenceManager; +import org.apache.nifi.parameter.ParameterReferencedControllerServiceData; import org.apache.nifi.parameter.StandardParameterProviderConfiguration; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; @@ -1160,14 +1159,18 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen throw new FlowSynchronizationException("Failed to synchronize Controller Service " + controllerService + " with proposed version", e); } } finally { - // Re-enable the controller service if necessary - serviceProvider.enableControllerServicesAsync(servicesToRestart); - notifyScheduledStateChange(servicesToRestart, synchronizationOptions); + // If the intent was to remove the Controller Service, or to disable it, then anything that was previously referencing it should remain stopped. + // However, if the intended state for the Controller Service is to be ENABLED, go ahead and re-enable/restart what we've stopped/disabled. + if (proposed != null && proposed.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) { + // Re-enable the controller service if necessary + serviceProvider.enableControllerServicesAsync(servicesToRestart); + notifyScheduledStateChange(servicesToRestart, synchronizationOptions); - // Restart any components that need to be restarted. - if (controllerService != null) { - serviceProvider.scheduleReferencingComponents(controllerService, referencesToRestart, context.getComponentScheduler()); - referencesToRestart.forEach(componentNode -> notifyScheduledStateChange(componentNode, synchronizationOptions)); + // Restart any components that need to be restarted. + if (controllerService != null) { + serviceProvider.scheduleReferencingComponents(controllerService, referencesToRestart, context.getComponentScheduler()); + referencesToRestart.forEach(componentNode -> notifyScheduledStateChange(componentNode, synchronizationOptions)); + } } } } finally { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java index 98a6d8a8e8..3ed688c6ab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizerTest.java @@ -49,9 +49,9 @@ import org.apache.nifi.flow.VersionedPort; import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.groups.ComponentIdGenerator; import org.apache.nifi.groups.ComponentScheduler; -import org.apache.nifi.groups.ScheduledStateChangeListener; import org.apache.nifi.groups.FlowSynchronizationOptions; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.ScheduledStateChangeListener; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.parameter.Parameter; @@ -72,6 +72,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.time.Duration; import java.util.ArrayList; @@ -677,6 +679,40 @@ public class StandardVersionedComponentSynchronizerTest { verify(service).setName("Hello"); } + @Test + public void testReferencesNotRestartedWhenServiceStopped() throws FlowSynchronizationException, InterruptedException, TimeoutException { + final ControllerServiceNode service = createMockControllerService(); + when(service.isActive()).thenReturn(true); + when(service.getState()).thenReturn(ControllerServiceState.ENABLED); + + // Make Processors A and B reference the controller service and start them + setReferences(service, processorA, processorB); + startProcessor(processorB); + + when(controllerServiceProvider.unscheduleReferencingComponents(service)).thenReturn(Collections.singletonMap(processorB, CompletableFuture.completedFuture(null))); + when(controllerServiceProvider.disableControllerServicesAsync(anyCollection())).thenReturn(CompletableFuture.completedFuture(null)); + + final VersionedControllerService versionedControllerService = createMinimalVersionedControllerService(); + versionedControllerService.setName("Hello"); + versionedControllerService.setScheduledState(ScheduledState.DISABLED); + + synchronizer.synchronize(service, versionedControllerService, group, synchronizationOptions); + + verify(controllerServiceProvider).unscheduleReferencingComponents(service); + verify(controllerServiceProvider).disableControllerServicesAsync(Collections.singleton(service)); + + Mockito.doAnswer(new Answer() { + @Override + public Void answer(final InvocationOnMock invocationOnMock) { + final Set services = invocationOnMock.getArgument(0); + assertTrue(services.isEmpty()); + return null; + } + }).when(controllerServiceProvider).enableControllerServicesAsync(Mockito.anySet()); + + verify(controllerServiceProvider, times(0)).scheduleReferencingComponents(Mockito.any(ControllerServiceNode.class), Mockito.anySet(), Mockito.any(ComponentScheduler.class)); + } + @Test public void testTerminateReferenceOnTimeout() throws FlowSynchronizationException, InterruptedException, TimeoutException { final ControllerServiceNode service = createMockControllerService();