diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java index ee8d9b4fdd..2fdb762ad0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.controller; +import java.util.List; + import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; @@ -162,6 +164,12 @@ public interface ProcessScheduler { */ void enableControllerService(ControllerServiceNode service); + /** + * Disables all of the given Controller Services in the order provided by the List + * @param services the controller services to disable + */ + void disableControllerServices(List services); + /** * Disables the Controller Service so that it can be updated * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index 10933dbc34..481a23181c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -23,18 +23,57 @@ import org.apache.nifi.controller.ControllerService; public interface ControllerServiceNode extends ConfiguredComponent { + /** + *

+ * Returns a proxy implementation of the Controller Service that this ControllerServiceNode + * encapsulates. The object returned by this method may be passed to other components, as + * required. Invocations of methods on the object returned will be intercepted and the service's + * state will be verified before the underlying implementation's method is called. + *

+ * + * @return a proxied ControllerService that can be addressed outside of the framework. + */ ControllerService getProxiedControllerService(); + /** + *

+ * Returns the actual implementation of the Controller Service that this ControllerServiceNode + * encapsulates. This direct implementation should NEVER be passed to another + * pluggable component. This implementation should be addressed only by the framework itself. + * If providing the controller service to another pluggable component, provide it with the + * proxied entity obtained via {@link #getProxiedControllerService()} + *

+ * + * @return the actual implementation of the Controller Service + */ ControllerService getControllerServiceImplementation(); + /** + * @return the current state of the Controller Service + */ ControllerServiceState getState(); + /** + * Updates the state of the Controller Service to the provided new state + * @param state the state to set the service to + */ void setState(ControllerServiceState state); + /** + * @return the ControllerServiceReference that describes which components are referencing this Controller Service + */ ControllerServiceReference getReferences(); + /** + * Indicates that the given component is now referencing this Controller Service + * @param referringComponent the component referencing this service + */ void addReference(ConfiguredComponent referringComponent); + /** + * Indicates that the given component is no longer referencing this Controller Service + * @param referringComponent the component that is no longer referencing this service + */ void removeReference(ConfiguredComponent referringComponent); void setComments(String comment); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 5ac4a0b440..7e2c51b8f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -19,7 +19,12 @@ package org.apache.nifi.controller.scheduling; import static java.util.Objects.requireNonNull; import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -670,47 +675,84 @@ public final class StandardProcessScheduler implements ProcessScheduler { componentLifeCycleThreadPool.execute(enableRunnable); } + @Override public void disableControllerService(final ControllerServiceNode service) { - service.verifyCanDisable(); + disableControllerServices(Collections.singletonList(service)); + } - final ScheduleState state = getScheduleState(requireNonNull(service)); + @Override + public void disableControllerServices(final List services) { + if (requireNonNull(services).isEmpty()) { + return; + } + + final List servicesToDisable = new ArrayList<>(services.size()); + for (final ControllerServiceNode serviceToDisable : services) { + if (serviceToDisable.getState() == ControllerServiceState.DISABLED || serviceToDisable.getState() == ControllerServiceState.DISABLING) { + continue; + } + + servicesToDisable.add(serviceToDisable); + } + + if (servicesToDisable.isEmpty()) { + return; + } + + // ensure that all controller services can be disabled. + for (final ControllerServiceNode serviceNode : servicesToDisable) { + final Set ignoredReferences = new HashSet<>(services); + ignoredReferences.remove(serviceNode); + serviceNode.verifyCanDisable(ignoredReferences); + } + + // mark services as disabling + for (final ControllerServiceNode serviceNode : servicesToDisable) { + serviceNode.setState(ControllerServiceState.DISABLING); + + final ScheduleState scheduleState = getScheduleState(serviceNode); + synchronized (scheduleState) { + scheduleState.setScheduled(false); + } + } + + final Queue nodes = new LinkedList<>(servicesToDisable); final Runnable disableRunnable = new Runnable() { @Override public void run() { - synchronized (state) { - state.setScheduled(false); - } + ControllerServiceNode service; + while ((service = nodes.poll()) != null) { + try (final NarCloseable x = NarCloseable.withNarLoader()) { + final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider, null); - try (final NarCloseable x = NarCloseable.withNarLoader()) { - final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider, null); - - try { - ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext); - } catch (final Exception e) { - final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; - final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service); - componentLog.error("Failed to invoke @OnDisabled method due to {}", cause); - - LOG.error("Failed to invoke @OnDisabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString()); - if (LOG.isDebugEnabled()) { - LOG.error("", cause); - } - - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext); try { - Thread.sleep(administrativeYieldMillis); - } catch (final InterruptedException ie) { + ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext); + } catch (final Exception e) { + final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; + final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service); + componentLog.error("Failed to invoke @OnDisabled method due to {}", cause); + + LOG.error("Failed to invoke @OnDisabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString()); + if (LOG.isDebugEnabled()) { + LOG.error("", cause); + } + + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext); + try { + Thread.sleep(administrativeYieldMillis); + } catch (final InterruptedException ie) { + } + } finally { + service.setState(ControllerServiceState.DISABLED); + heartbeater.heartbeat(); } - } finally { - service.setState(ControllerServiceState.DISABLED); - heartbeater.heartbeat(); } } } }; - service.setState(ControllerServiceState.DISABLING); componentLifeCycleThreadPool.execute(disableRunnable); } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 85f5a32de3..8e7f1f5f4f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -153,11 +153,17 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i final ControllerServiceReference references = getReferences(); + final Set activeReferences = new HashSet<>(); for (final ConfiguredComponent activeReference : references.getActiveReferences()) { if (!ignoreReferences.contains(activeReference)) { - throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by at least one component that is currently running"); + activeReferences.add(activeReference); } } + + if (!activeReferences.isEmpty()) { + throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by " + activeReferences.size() + + " components that are currently running: " + activeReferences); + } } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 086891a5bd..1f1a1c0f43 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -214,13 +214,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } Collections.reverse(toDisable); - for (final ControllerServiceNode nodeToDisable : toDisable) { - final ControllerServiceState state = nodeToDisable.getState(); - - if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) { - disableControllerService(nodeToDisable); - } - } + processScheduler.disableControllerServices(toDisable); } @Override @@ -428,7 +422,6 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public void disableControllerService(final ControllerServiceNode serviceNode) { - serviceNode.verifyCanDisable(); processScheduler.disableControllerService(serviceNode); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index 03aca7e21b..a3589fae0d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -20,22 +20,28 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; +import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.StandardProcessorNode; +import org.apache.nifi.controller.scheduling.StandardProcessScheduler; import org.apache.nifi.controller.service.mock.DummyProcessor; import org.apache.nifi.controller.service.mock.ServiceA; import org.apache.nifi.controller.service.mock.ServiceB; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.StandardProcessGroup; import org.apache.nifi.processor.StandardProcessorInitializationContext; import org.apache.nifi.processor.StandardValidationContextFactory; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -43,29 +49,14 @@ import org.mockito.stubbing.Answer; public class TestStandardControllerServiceProvider { + @BeforeClass + public static void setNiFiProps() { + System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties"); + } + private ProcessScheduler createScheduler() { - final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class); - Mockito.doAnswer(new Answer() { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable { - final ControllerServiceNode node = (ControllerServiceNode) invocation.getArguments()[0]; - node.verifyCanEnable(); - node.setState(ControllerServiceState.ENABLED); - return null; - } - }).when(scheduler).enableControllerService(Mockito.any(ControllerServiceNode.class)); - - Mockito.doAnswer(new Answer() { - @Override - public Object answer(final InvocationOnMock invocation) throws Throwable { - final ControllerServiceNode node = (ControllerServiceNode) invocation.getArguments()[0]; - node.verifyCanDisable(); - node.setState(ControllerServiceState.DISABLED); - return null; - } - }).when(scheduler).disableControllerService(Mockito.any(ControllerServiceNode.class)); - - return scheduler; + final Heartbeater heartbeater = Mockito.mock(Heartbeater.class); + return new StandardProcessScheduler(heartbeater, null, null); } @Test @@ -78,7 +69,7 @@ public class TestStandardControllerServiceProvider { provider.disableControllerService(serviceNode); } - @Test + @Test(timeout=10000) public void testEnableDisableWithReference() { final ProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); @@ -104,10 +95,22 @@ public class TestStandardControllerServiceProvider { } provider.disableControllerService(serviceNodeA); + waitForServiceState(serviceNodeA, ControllerServiceState.DISABLED); + provider.disableControllerService(serviceNodeB); + waitForServiceState(serviceNodeB, ControllerServiceState.DISABLED); } - @Test + private void waitForServiceState(final ControllerServiceNode service, final ControllerServiceState desiredState) { + while (service.getState() != desiredState) { + try { + Thread.sleep(50L); + } catch (final InterruptedException e) { + } + } + } + + @Test(timeout=10000) public void testEnableReferencingServicesGraph() { final ProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); @@ -137,12 +140,20 @@ public class TestStandardControllerServiceProvider { provider.enableControllerService(serviceNode4); provider.enableReferencingServices(serviceNode4); - assertEquals(ControllerServiceState.ENABLED, serviceNode3.getState()); - assertEquals(ControllerServiceState.ENABLED, serviceNode2.getState()); - assertEquals(ControllerServiceState.ENABLED, serviceNode1.getState()); + // Verify that the services are either ENABLING or ENABLED, and wait for all of them to become ENABLED. + // Note that we set a timeout of 10 seconds, in case a bug occurs and the services never become ENABLED. + final Set validStates = new HashSet<>(); + validStates.add(ControllerServiceState.ENABLED); + validStates.add(ControllerServiceState.ENABLING); + + while (serviceNode3.getState() != ControllerServiceState.ENABLED || serviceNode2.getState() != ControllerServiceState.ENABLED || serviceNode1.getState() != ControllerServiceState.ENABLED) { + assertTrue(validStates.contains(serviceNode3.getState())); + assertTrue(validStates.contains(serviceNode2.getState())); + assertTrue(validStates.contains(serviceNode1.getState())); + } } - @Test + @Test(timeout=10000) public void testStartStopReferencingComponents() { final ProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); @@ -208,9 +219,17 @@ public class TestStandardControllerServiceProvider { provider.enableReferencingServices(serviceNode4); provider.scheduleReferencingComponents(serviceNode4); - assertEquals(ControllerServiceState.ENABLED, serviceNode3.getState()); - assertEquals(ControllerServiceState.ENABLED, serviceNode2.getState()); - assertEquals(ControllerServiceState.ENABLED, serviceNode1.getState()); + final Set enableStates = new HashSet<>(); + enableStates.add(ControllerServiceState.ENABLED); + enableStates.add(ControllerServiceState.ENABLING); + + while (serviceNode3.getState() != ControllerServiceState.ENABLED + || serviceNode2.getState() != ControllerServiceState.ENABLED + || serviceNode1.getState() != ControllerServiceState.ENABLED) { + assertTrue(enableStates.contains(serviceNode3.getState())); + assertTrue(enableStates.contains(serviceNode2.getState())); + assertTrue(enableStates.contains(serviceNode1.getState())); + } assertTrue(procNodeA.isRunning()); assertTrue(procNodeB.isRunning()); @@ -218,18 +237,32 @@ public class TestStandardControllerServiceProvider { provider.unscheduleReferencingComponents(serviceNode4); assertFalse(procNodeA.isRunning()); assertFalse(procNodeB.isRunning()); - assertEquals(ControllerServiceState.ENABLED, serviceNode3.getState()); - assertEquals(ControllerServiceState.ENABLED, serviceNode2.getState()); - assertEquals(ControllerServiceState.ENABLED, serviceNode1.getState()); + while (serviceNode3.getState() != ControllerServiceState.ENABLED + || serviceNode2.getState() != ControllerServiceState.ENABLED + || serviceNode1.getState() != ControllerServiceState.ENABLED) { + assertTrue(enableStates.contains(serviceNode3.getState())); + assertTrue(enableStates.contains(serviceNode2.getState())); + assertTrue(enableStates.contains(serviceNode1.getState())); + } provider.disableReferencingServices(serviceNode4); - assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState()); - assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState()); - assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState()); + final Set disableStates = new HashSet<>(); + disableStates.add(ControllerServiceState.DISABLED); + disableStates.add(ControllerServiceState.DISABLING); + + // Wait for the services to be disabled. + while (serviceNode3.getState() != ControllerServiceState.DISABLED + || serviceNode2.getState() != ControllerServiceState.DISABLED + || serviceNode1.getState() != ControllerServiceState.DISABLED) { + assertTrue(disableStates.contains(serviceNode3.getState())); + assertTrue(disableStates.contains(serviceNode2.getState())); + assertTrue(disableStates.contains(serviceNode1.getState())); + } + assertEquals(ControllerServiceState.ENABLED, serviceNode4.getState()); provider.disableControllerService(serviceNode4); - assertEquals(ControllerServiceState.DISABLED, serviceNode4.getState()); + assertTrue(disableStates.contains(serviceNode4.getState())); } @Test @@ -376,4 +409,36 @@ public class TestStandardControllerServiceProvider { assertTrue(ordered.get(1) == serviceNode3); } + private ProcessorNode createProcessor(final ProcessScheduler scheduler, final ControllerServiceProvider serviceProvider) { + final ProcessorNode procNode = new StandardProcessorNode(new DummyProcessor(), UUID.randomUUID().toString(), + new StandardValidationContextFactory(serviceProvider), scheduler, serviceProvider); + + final ProcessGroup group = new StandardProcessGroup(UUID.randomUUID().toString(), serviceProvider, scheduler, null, null); + group.addProcessor(procNode); + procNode.setProcessGroup(group); + + return procNode; + } + + @Test + public void testEnableReferencingComponents() { + final ProcessScheduler scheduler = createScheduler(); + final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null); + final ControllerServiceNode serviceNode = provider.createControllerService(ServiceA.class.getName(), "1", false); + + final ProcessorNode procNode = createProcessor(scheduler, provider); + serviceNode.addReference(procNode); + + procNode.setScheduledState(ScheduledState.STOPPED); + provider.unscheduleReferencingComponents(serviceNode); + assertEquals(ScheduledState.STOPPED, procNode.getScheduledState()); + + procNode.setScheduledState(ScheduledState.RUNNING); + provider.unscheduleReferencingComponents(serviceNode); + assertEquals(ScheduledState.STOPPED, procNode.getScheduledState()); + + procNode.setScheduledState(ScheduledState.DISABLED); + provider.unscheduleReferencingComponents(serviceNode); + assertEquals(ScheduledState.DISABLED, procNode.getScheduledState()); + } }