NIFI-2032 fixed 'enableControllerServices' logic

added getRequiredControllerServices() operation to ControllerServiceNode
This commit is contained in:
Oleg Zhurakousky 2016-06-17 12:08:26 -04:00
parent c955ec1689
commit 6402eb9315
5 changed files with 204 additions and 96 deletions

View File

@ -20,6 +20,7 @@ import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -50,6 +51,16 @@ public interface ControllerServiceNode extends ConfiguredComponent {
*/ */
ControllerService getProxiedControllerService(); ControllerService getProxiedControllerService();
/**
* Returns the list of services that are required to be enabled before this
* service is enabled. The returned list is flattened and contains both
* immediate and transient dependencies.
*
* @return list of services required to be enabled before this service is
* enabled
*/
List<ControllerServiceNode> getRequiredControllerServices();
/** /**
* <p> * <p>
* Returns the actual implementation of the Controller Service that this ControllerServiceNode * Returns the actual implementation of the Controller Service that this ControllerServiceNode

View File

@ -17,10 +17,13 @@
package org.apache.nifi.controller.service; package org.apache.nifi.controller.service;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -35,6 +38,7 @@ import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory; import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType; import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractConfiguredComponent; import org.apache.nifi.controller.AbstractConfiguredComponent;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
@ -164,6 +168,21 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
} }
} }
@Override
public List<ControllerServiceNode> getRequiredControllerServices() {
List<ControllerServiceNode> requiredServices = new ArrayList<>();
for (Entry<PropertyDescriptor, String> pEntry : this.getProperties().entrySet()) {
PropertyDescriptor descriptor = pEntry.getKey();
if (descriptor.getControllerServiceDefinition() != null && descriptor.isRequired()) {
ControllerServiceNode rNode = this.processGroup.getControllerService(pEntry.getValue());
requiredServices.add(rNode);
requiredServices.addAll(rNode.getRequiredControllerServices());
}
}
return requiredServices;
}
@Override @Override
public void removeReference(final ConfiguredComponent referencingComponent) { public void removeReference(final ConfiguredComponent referencingComponent) {
writeLock.lock(); writeLock.lock();

View File

@ -25,14 +25,13 @@ import java.lang.reflect.Proxy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.Comparator;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnAdded;
@ -50,7 +49,6 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ComponentLifeCycleException; import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException; import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.ExtensionManager;
@ -58,7 +56,6 @@ import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -70,7 +67,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
private final ProcessScheduler processScheduler; private final ProcessScheduler processScheduler;
private static final Set<Method> validDisabledMethods; private static final Set<Method> validDisabledMethods;
private final BulletinRepository bulletinRepo; // private final BulletinRepository bulletinRepo;
private final StateManagerProvider stateManagerProvider; private final StateManagerProvider stateManagerProvider;
private final FlowController flowController; private final FlowController flowController;
@ -91,7 +88,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
this.flowController = flowController; this.flowController = flowController;
this.processScheduler = scheduler; this.processScheduler = scheduler;
this.bulletinRepo = bulletinRepo; // this.bulletinRepo = bulletinRepo;
this.stateManagerProvider = stateManagerProvider; this.stateManagerProvider = stateManagerProvider;
} }
@ -372,98 +369,32 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override @Override
public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) { public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
final Set<ControllerServiceNode> servicesToEnable = new HashSet<>(); boolean shouldStart = true;
// Ensure that all nodes are already disabled
for (final ControllerServiceNode serviceNode : serviceNodes) {
final ControllerServiceState curState = serviceNode.getState();
if (ControllerServiceState.DISABLED.equals(curState)) {
servicesToEnable.add(serviceNode);
} else {
logger.warn("Cannot enable {} because it is not disabled; current state is {}", serviceNode, curState);
}
}
// determine the order to load the services. We have to ensure that if service A references service B, then B Iterator<ControllerServiceNode> serviceIter = serviceNodes.iterator();
// is enabled first, and so on. while (serviceIter.hasNext() && shouldStart) {
final Map<String, ControllerServiceNode> idToNodeMap = new HashMap<>(); ControllerServiceNode controllerServiceNode = serviceIter.next();
for (final ControllerServiceNode node : servicesToEnable) { List<ControllerServiceNode> requiredServices = ((StandardControllerServiceNode) controllerServiceNode).getRequiredControllerServices();
idToNodeMap.put(node.getIdentifier(), node); for (ControllerServiceNode requiredService : requiredServices) {
} if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) {
shouldStart = false;
// We can have many Controller Services dependent on one another. We can have many of these
// disparate lists of Controller Services that are dependent on one another. We refer to each
// of these as a branch.
final List<List<ControllerServiceNode>> branches = determineEnablingOrder(idToNodeMap);
if (branches.isEmpty()) {
logger.info("No Controller Services to enable");
return;
} else {
logger.info("Will enable {} Controller Services", servicesToEnable.size());
}
final Set<ControllerServiceNode> enabledNodes = Collections.synchronizedSet(new HashSet<ControllerServiceNode>());
final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()), new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
final Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
t.setName("Enable Controller Services");
return t;
}
});
for (final List<ControllerServiceNode> branch : branches) {
final Runnable enableBranchRunnable = new Runnable() {
@Override
public void run() {
logger.debug("Enabling Controller Service Branch {}", branch);
for (final ControllerServiceNode serviceNode : branch) {
try {
if (!enabledNodes.contains(serviceNode)) {
enabledNodes.add(serviceNode);
logger.info("Enabling {}", serviceNode);
try {
serviceNode.verifyCanEnable();
processScheduler.enableControllerService(serviceNode);
} catch (final Exception e) {
logger.error("Failed to enable " + serviceNode + " due to " + e);
if (logger.isDebugEnabled()) {
logger.error("", e);
}
if (bulletinRepo != null) {
bulletinRepo.addBulletin(BulletinFactory.createBulletin(
"Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e));
}
}
}
// wait for service to finish enabling.
while (ControllerServiceState.ENABLING.equals(serviceNode.getState())) {
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
}
}
logger.info("State for {} is now {}", serviceNode, serviceNode.getState());
} catch (final Exception e) {
logger.error("Failed to enable {} due to {}", serviceNode, e.toString());
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
} }
}; }
executor.submit(enableBranchRunnable);
} }
executor.shutdown(); if (shouldStart) {
List<ControllerServiceNode> services = new ArrayList<>(serviceNodes);
Collections.sort(services, new Comparator<ControllerServiceNode>() {
@Override
public int compare(ControllerServiceNode s1, ControllerServiceNode s2) {
return s2.getRequiredControllerServices().contains(s1) ? -1 : 1;
}
});
for (ControllerServiceNode controllerServiceNode : services) {
this.enableControllerService(controllerServiceNode);
}
}
} }
static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) { static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {

View File

@ -17,9 +17,11 @@
package org.apache.nifi.controller.service; package org.apache.nifi.controller.service;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.beans.PropertyDescriptor; import java.beans.PropertyDescriptor;
import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
@ -39,6 +41,7 @@ import org.apache.nifi.controller.service.mock.DummyProcessor;
import org.apache.nifi.controller.service.mock.MockProcessGroup; import org.apache.nifi.controller.service.mock.MockProcessGroup;
import org.apache.nifi.controller.service.mock.ServiceA; import org.apache.nifi.controller.service.mock.ServiceA;
import org.apache.nifi.controller.service.mock.ServiceB; import org.apache.nifi.controller.service.mock.ServiceB;
import org.apache.nifi.controller.service.mock.ServiceC;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.StandardProcessGroup; import org.apache.nifi.groups.StandardProcessGroup;
import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.processor.StandardValidationContextFactory;
@ -386,4 +389,93 @@ public class TestStandardControllerServiceProvider {
provider.unscheduleReferencingComponents(serviceNode); provider.unscheduleReferencingComponents(serviceNode);
assertEquals(ScheduledState.STOPPED, procNode.getScheduledState()); assertEquals(ScheduledState.STOPPED, procNode.getScheduledState());
} }
@Test
public void validateEnableServices() {
StandardProcessScheduler scheduler = createScheduler();
FlowController controller = Mockito.mock(FlowController.class);
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider);
ProcessGroup procGroup = new MockProcessGroup();
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceA.class.getName(), "5", false);
procGroup.addControllerService(serviceNode1);
procGroup.addControllerService(serviceNode2);
procGroup.addControllerService(serviceNode3);
procGroup.addControllerService(serviceNode4);
procGroup.addControllerService(serviceNode5);
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
serviceNode5.setProperty(ServiceA.OTHER_SERVICE.getName(), "1");
provider.enableControllerServices(
Arrays.asList(new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5}));
assertTrue(serviceNode1.isActive());
assertTrue(serviceNode2.isActive());
assertTrue(serviceNode3.isActive());
assertTrue(serviceNode4.isActive());
assertTrue(serviceNode5.isActive());
}
@Test
public void validateEnableServicesWithDisabledMissingService() {
StandardProcessScheduler scheduler = createScheduler();
FlowController controller = Mockito.mock(FlowController.class);
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider);
ProcessGroup procGroup = new MockProcessGroup();
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceA.class.getName(), "5", false);
ControllerServiceNode serviceNode6 = provider.createControllerService(ServiceB.class.getName(), "6", false);
ControllerServiceNode serviceNode7 = provider.createControllerService(ServiceC.class.getName(), "7", false);
procGroup.addControllerService(serviceNode1);
procGroup.addControllerService(serviceNode2);
procGroup.addControllerService(serviceNode3);
procGroup.addControllerService(serviceNode4);
procGroup.addControllerService(serviceNode5);
procGroup.addControllerService(serviceNode6);
procGroup.addControllerService(serviceNode7);
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
serviceNode5.setProperty(ServiceA.OTHER_SERVICE.getName(), "6");
serviceNode7.setProperty(ServiceC.REQ_SERVICE_1.getName(), "2");
serviceNode7.setProperty(ServiceC.REQ_SERVICE_2.getName(), "3");
provider.enableControllerServices(Arrays.asList(
new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5, serviceNode7}));
assertFalse(serviceNode1.isActive());
assertFalse(serviceNode2.isActive());
assertFalse(serviceNode3.isActive());
assertFalse(serviceNode4.isActive());
assertFalse(serviceNode5.isActive());
assertFalse(serviceNode6.isActive());
provider.enableControllerService(serviceNode6);
provider.enableControllerServices(Arrays.asList(
new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5 }));
assertTrue(serviceNode1.isActive());
assertTrue(serviceNode2.isActive());
assertTrue(serviceNode3.isActive());
assertTrue(serviceNode4.isActive());
assertTrue(serviceNode5.isActive());
assertTrue(serviceNode6.isActive());
}
} }

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.service.mock;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ControllerService;
public class ServiceC extends AbstractControllerService {
public static final PropertyDescriptor REQ_SERVICE_1 = new PropertyDescriptor.Builder()
.name("S1")
.identifiesControllerService(ControllerService.class)
.required(true)
.build();
public static final PropertyDescriptor REQ_SERVICE_2 = new PropertyDescriptor.Builder()
.name("S2")
.identifiesControllerService(ControllerService.class)
.required(true)
.build();
public static final PropertyDescriptor OPT_SERVICE = new PropertyDescriptor.Builder()
.name("S3")
.identifiesControllerService(ControllerService.class)
.required(false)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(REQ_SERVICE_1);
descriptors.add(REQ_SERVICE_2);
descriptors.add(OPT_SERVICE);
return descriptors;
}
}