NIFI-10515: If propopsed controller service is disabled, ensure that we don't re-enable the service and references when synchronize(ControllerServiceNode...) is called

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #6425.
This commit is contained in:
Mark Payne 2022-09-16 13:45:56 -04:00 committed by Joe Gresock
parent abbd7a89c8
commit eaa275a5e4
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
2 changed files with 49 additions and 10 deletions

View File

@ -80,10 +80,9 @@ import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterReferenceManager;
import org.apache.nifi.parameter.ParameterReferencedControllerServiceData;
import org.apache.nifi.parameter.ParameterProviderConfiguration;
import org.apache.nifi.parameter.ParameterReferenceManager;
import org.apache.nifi.parameter.ParameterReferencedControllerServiceData;
import org.apache.nifi.parameter.StandardParameterProviderConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
@ -1160,14 +1159,18 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
throw new FlowSynchronizationException("Failed to synchronize Controller Service " + controllerService + " with proposed version", e);
}
} finally {
// Re-enable the controller service if necessary
serviceProvider.enableControllerServicesAsync(servicesToRestart);
notifyScheduledStateChange(servicesToRestart, synchronizationOptions);
// If the intent was to remove the Controller Service, or to disable it, then anything that was previously referencing it should remain stopped.
// However, if the intended state for the Controller Service is to be ENABLED, go ahead and re-enable/restart what we've stopped/disabled.
if (proposed != null && proposed.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) {
// Re-enable the controller service if necessary
serviceProvider.enableControllerServicesAsync(servicesToRestart);
notifyScheduledStateChange(servicesToRestart, synchronizationOptions);
// Restart any components that need to be restarted.
if (controllerService != null) {
serviceProvider.scheduleReferencingComponents(controllerService, referencesToRestart, context.getComponentScheduler());
referencesToRestart.forEach(componentNode -> notifyScheduledStateChange(componentNode, synchronizationOptions));
// Restart any components that need to be restarted.
if (controllerService != null) {
serviceProvider.scheduleReferencingComponents(controllerService, referencesToRestart, context.getComponentScheduler());
referencesToRestart.forEach(componentNode -> notifyScheduledStateChange(componentNode, synchronizationOptions));
}
}
}
} finally {

View File

@ -49,9 +49,9 @@ import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.groups.ComponentIdGenerator;
import org.apache.nifi.groups.ComponentScheduler;
import org.apache.nifi.groups.ScheduledStateChangeListener;
import org.apache.nifi.groups.FlowSynchronizationOptions;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ScheduledStateChangeListener;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.Parameter;
@ -72,6 +72,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.time.Duration;
import java.util.ArrayList;
@ -677,6 +679,40 @@ public class StandardVersionedComponentSynchronizerTest {
verify(service).setName("Hello");
}
@Test
public void testReferencesNotRestartedWhenServiceStopped() throws FlowSynchronizationException, InterruptedException, TimeoutException {
final ControllerServiceNode service = createMockControllerService();
when(service.isActive()).thenReturn(true);
when(service.getState()).thenReturn(ControllerServiceState.ENABLED);
// Make Processors A and B reference the controller service and start them
setReferences(service, processorA, processorB);
startProcessor(processorB);
when(controllerServiceProvider.unscheduleReferencingComponents(service)).thenReturn(Collections.singletonMap(processorB, CompletableFuture.completedFuture(null)));
when(controllerServiceProvider.disableControllerServicesAsync(anyCollection())).thenReturn(CompletableFuture.completedFuture(null));
final VersionedControllerService versionedControllerService = createMinimalVersionedControllerService();
versionedControllerService.setName("Hello");
versionedControllerService.setScheduledState(ScheduledState.DISABLED);
synchronizer.synchronize(service, versionedControllerService, group, synchronizationOptions);
verify(controllerServiceProvider).unscheduleReferencingComponents(service);
verify(controllerServiceProvider).disableControllerServicesAsync(Collections.singleton(service));
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(final InvocationOnMock invocationOnMock) {
final Set<?> services = invocationOnMock.getArgument(0);
assertTrue(services.isEmpty());
return null;
}
}).when(controllerServiceProvider).enableControllerServicesAsync(Mockito.anySet());
verify(controllerServiceProvider, times(0)).scheduleReferencingComponents(Mockito.any(ControllerServiceNode.class), Mockito.anySet(), Mockito.any(ComponentScheduler.class));
}
@Test
public void testTerminateReferenceOnTimeout() throws FlowSynchronizationException, InterruptedException, TimeoutException {
final ControllerServiceNode service = createMockControllerService();