diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index bd3a42b989..c0ff480c69 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -20,6 +20,7 @@ import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.groups.ProcessGroup; +import java.util.List; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; @@ -50,6 +51,16 @@ public interface ControllerServiceNode extends ConfiguredComponent { */ ControllerService getProxiedControllerService(); + /** + * Returns the list of services that are required to be enabled before this + * service is enabled. The returned list is flattened and contains both + * immediate and transient dependencies. + * + * @return list of services required to be enabled before this service is + * enabled + */ + List getRequiredControllerServices(); + /** *

* Returns the actual implementation of the Controller Service that this ControllerServiceNode diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 6e29053743..05dbc2dcfc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -17,10 +17,13 @@ package org.apache.nifi.controller.service; import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; +import java.util.Map.Entry; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,6 +38,7 @@ import org.apache.nifi.authorization.Resource; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.resource.ResourceType; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractConfiguredComponent; import org.apache.nifi.controller.ConfigurationContext; @@ -164,6 +168,21 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i } } + @Override + public List getRequiredControllerServices() { + List requiredServices = new ArrayList<>(); + for (Entry pEntry : this.getProperties().entrySet()) { + PropertyDescriptor descriptor = pEntry.getKey(); + if (descriptor.getControllerServiceDefinition() != null && descriptor.isRequired()) { + ControllerServiceNode rNode = this.processGroup.getControllerService(pEntry.getValue()); + requiredServices.add(rNode); + requiredServices.addAll(rNode.getRequiredControllerServices()); + } + } + return requiredServices; + } + + @Override public void removeReference(final ConfiguredComponent referencingComponent) { writeLock.lock(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 4e3249fd03..3817609c5e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -25,14 +25,13 @@ import java.lang.reflect.Proxy; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; +import java.util.Comparator; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnAdded; @@ -50,7 +49,6 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.exception.ComponentLifeCycleException; import org.apache.nifi.controller.exception.ControllerServiceInstantiationException; -import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.ExtensionManager; @@ -58,7 +56,6 @@ import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.ReflectionUtils; import org.slf4j.Logger; @@ -70,7 +67,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi private final ProcessScheduler processScheduler; private static final Set validDisabledMethods; - private final BulletinRepository bulletinRepo; + // private final BulletinRepository bulletinRepo; private final StateManagerProvider stateManagerProvider; private final FlowController flowController; @@ -91,7 +88,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi this.flowController = flowController; this.processScheduler = scheduler; - this.bulletinRepo = bulletinRepo; + // this.bulletinRepo = bulletinRepo; this.stateManagerProvider = stateManagerProvider; } @@ -372,98 +369,32 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi @Override public void enableControllerServices(final Collection serviceNodes) { - final Set servicesToEnable = new HashSet<>(); - // Ensure that all nodes are already disabled - for (final ControllerServiceNode serviceNode : serviceNodes) { - final ControllerServiceState curState = serviceNode.getState(); - if (ControllerServiceState.DISABLED.equals(curState)) { - servicesToEnable.add(serviceNode); - } else { - logger.warn("Cannot enable {} because it is not disabled; current state is {}", serviceNode, curState); - } - } + boolean shouldStart = true; - // determine the order to load the services. We have to ensure that if service A references service B, then B - // is enabled first, and so on. - final Map idToNodeMap = new HashMap<>(); - for (final ControllerServiceNode node : servicesToEnable) { - idToNodeMap.put(node.getIdentifier(), node); - } - - // We can have many Controller Services dependent on one another. We can have many of these - // disparate lists of Controller Services that are dependent on one another. We refer to each - // of these as a branch. - final List> branches = determineEnablingOrder(idToNodeMap); - - if (branches.isEmpty()) { - logger.info("No Controller Services to enable"); - return; - } else { - logger.info("Will enable {} Controller Services", servicesToEnable.size()); - } - - final Set enabledNodes = Collections.synchronizedSet(new HashSet()); - final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()), new ThreadFactory() { - @Override - public Thread newThread(final Runnable r) { - final Thread t = Executors.defaultThreadFactory().newThread(r); - t.setDaemon(true); - t.setName("Enable Controller Services"); - return t; - } - }); - - for (final List branch : branches) { - final Runnable enableBranchRunnable = new Runnable() { - @Override - public void run() { - logger.debug("Enabling Controller Service Branch {}", branch); - - for (final ControllerServiceNode serviceNode : branch) { - try { - if (!enabledNodes.contains(serviceNode)) { - enabledNodes.add(serviceNode); - - logger.info("Enabling {}", serviceNode); - try { - serviceNode.verifyCanEnable(); - processScheduler.enableControllerService(serviceNode); - } catch (final Exception e) { - logger.error("Failed to enable " + serviceNode + " due to " + e); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - - if (bulletinRepo != null) { - bulletinRepo.addBulletin(BulletinFactory.createBulletin( - "Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e)); - } - } - } - - // wait for service to finish enabling. - while (ControllerServiceState.ENABLING.equals(serviceNode.getState())) { - try { - Thread.sleep(100L); - } catch (final InterruptedException ie) { - } - } - - logger.info("State for {} is now {}", serviceNode, serviceNode.getState()); - } catch (final Exception e) { - logger.error("Failed to enable {} due to {}", serviceNode, e.toString()); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - } - } + Iterator serviceIter = serviceNodes.iterator(); + while (serviceIter.hasNext() && shouldStart) { + ControllerServiceNode controllerServiceNode = serviceIter.next(); + List requiredServices = ((StandardControllerServiceNode) controllerServiceNode).getRequiredControllerServices(); + for (ControllerServiceNode requiredService : requiredServices) { + if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) { + shouldStart = false; } - }; - - executor.submit(enableBranchRunnable); + } } - executor.shutdown(); + if (shouldStart) { + List services = new ArrayList<>(serviceNodes); + Collections.sort(services, new Comparator() { + @Override + public int compare(ControllerServiceNode s1, ControllerServiceNode s2) { + return s2.getRequiredControllerServices().contains(s1) ? -1 : 1; + } + }); + + for (ControllerServiceNode controllerServiceNode : services) { + this.enableControllerService(controllerServiceNode); + } + } } static List> determineEnablingOrder(final Map serviceNodeMap) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index 91a2e7acfe..16f5481e3b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -17,9 +17,11 @@ package org.apache.nifi.controller.service; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.beans.PropertyDescriptor; +import java.util.Arrays; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -39,6 +41,7 @@ import org.apache.nifi.controller.service.mock.DummyProcessor; import org.apache.nifi.controller.service.mock.MockProcessGroup; import org.apache.nifi.controller.service.mock.ServiceA; import org.apache.nifi.controller.service.mock.ServiceB; +import org.apache.nifi.controller.service.mock.ServiceC; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.StandardProcessGroup; import org.apache.nifi.processor.StandardValidationContextFactory; @@ -386,4 +389,93 @@ public class TestStandardControllerServiceProvider { provider.unscheduleReferencingComponents(serviceNode); assertEquals(ScheduledState.STOPPED, procNode.getScheduledState()); } + + @Test + public void validateEnableServices() { + StandardProcessScheduler scheduler = createScheduler(); + FlowController controller = Mockito.mock(FlowController.class); + StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider); + ProcessGroup procGroup = new MockProcessGroup(); + Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); + + ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false); + ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false); + ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false); + ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false); + ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceA.class.getName(), "5", false); + + procGroup.addControllerService(serviceNode1); + procGroup.addControllerService(serviceNode2); + procGroup.addControllerService(serviceNode3); + procGroup.addControllerService(serviceNode4); + procGroup.addControllerService(serviceNode5); + + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4"); + serviceNode5.setProperty(ServiceA.OTHER_SERVICE.getName(), "1"); + + provider.enableControllerServices( + Arrays.asList(new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5})); + + assertTrue(serviceNode1.isActive()); + assertTrue(serviceNode2.isActive()); + assertTrue(serviceNode3.isActive()); + assertTrue(serviceNode4.isActive()); + assertTrue(serviceNode5.isActive()); + } + + @Test + public void validateEnableServicesWithDisabledMissingService() { + StandardProcessScheduler scheduler = createScheduler(); + FlowController controller = Mockito.mock(FlowController.class); + StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider); + ProcessGroup procGroup = new MockProcessGroup(); + Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup); + + ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false); + ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false); + ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false); + ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false); + ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceA.class.getName(), "5", false); + ControllerServiceNode serviceNode6 = provider.createControllerService(ServiceB.class.getName(), "6", false); + ControllerServiceNode serviceNode7 = provider.createControllerService(ServiceC.class.getName(), "7", false); + + procGroup.addControllerService(serviceNode1); + procGroup.addControllerService(serviceNode2); + procGroup.addControllerService(serviceNode3); + procGroup.addControllerService(serviceNode4); + procGroup.addControllerService(serviceNode5); + procGroup.addControllerService(serviceNode6); + procGroup.addControllerService(serviceNode7); + + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); + serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4"); + serviceNode5.setProperty(ServiceA.OTHER_SERVICE.getName(), "6"); + serviceNode7.setProperty(ServiceC.REQ_SERVICE_1.getName(), "2"); + serviceNode7.setProperty(ServiceC.REQ_SERVICE_2.getName(), "3"); + + provider.enableControllerServices(Arrays.asList( + new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5, serviceNode7})); + assertFalse(serviceNode1.isActive()); + assertFalse(serviceNode2.isActive()); + assertFalse(serviceNode3.isActive()); + assertFalse(serviceNode4.isActive()); + assertFalse(serviceNode5.isActive()); + assertFalse(serviceNode6.isActive()); + + provider.enableControllerService(serviceNode6); + provider.enableControllerServices(Arrays.asList( + new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5 })); + + assertTrue(serviceNode1.isActive()); + assertTrue(serviceNode2.isActive()); + assertTrue(serviceNode3.isActive()); + assertTrue(serviceNode4.isActive()); + assertTrue(serviceNode5.isActive()); + assertTrue(serviceNode6.isActive()); + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceC.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceC.java new file mode 100644 index 0000000000..8c46bb6da1 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceC.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.service.mock; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ControllerService; + +public class ServiceC extends AbstractControllerService { + + public static final PropertyDescriptor REQ_SERVICE_1 = new PropertyDescriptor.Builder() + .name("S1") + .identifiesControllerService(ControllerService.class) + .required(true) + .build(); + + public static final PropertyDescriptor REQ_SERVICE_2 = new PropertyDescriptor.Builder() + .name("S2") + .identifiesControllerService(ControllerService.class) + .required(true) + .build(); + + public static final PropertyDescriptor OPT_SERVICE = new PropertyDescriptor.Builder() + .name("S3") + .identifiesControllerService(ControllerService.class) + .required(false) + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + final List descriptors = new ArrayList<>(); + descriptors.add(REQ_SERVICE_1); + descriptors.add(REQ_SERVICE_2); + descriptors.add(OPT_SERVICE); + return descriptors; + } + +}