mirror of https://github.com/apache/nifi.git
NIFI-2032 fixed 'enableControllerServices' logic
added getRequiredControllerServices() operation to ControllerServiceNode
This commit is contained in:
parent
c955ec1689
commit
6402eb9315
|
@ -20,6 +20,7 @@ import org.apache.nifi.controller.ConfiguredComponent;
|
|||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.groups.ProcessGroup;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
|
@ -50,6 +51,16 @@ public interface ControllerServiceNode extends ConfiguredComponent {
|
|||
*/
|
||||
ControllerService getProxiedControllerService();
|
||||
|
||||
/**
|
||||
* Returns the list of services that are required to be enabled before this
|
||||
* service is enabled. The returned list is flattened and contains both
|
||||
* immediate and transient dependencies.
|
||||
*
|
||||
* @return list of services required to be enabled before this service is
|
||||
* enabled
|
||||
*/
|
||||
List<ControllerServiceNode> getRequiredControllerServices();
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Returns the actual implementation of the Controller Service that this ControllerServiceNode
|
||||
|
|
|
@ -17,10 +17,13 @@
|
|||
package org.apache.nifi.controller.service;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -35,6 +38,7 @@ import org.apache.nifi.authorization.Resource;
|
|||
import org.apache.nifi.authorization.resource.Authorizable;
|
||||
import org.apache.nifi.authorization.resource.ResourceFactory;
|
||||
import org.apache.nifi.authorization.resource.ResourceType;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.controller.AbstractConfiguredComponent;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
|
@ -164,6 +168,21 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ControllerServiceNode> getRequiredControllerServices() {
|
||||
List<ControllerServiceNode> requiredServices = new ArrayList<>();
|
||||
for (Entry<PropertyDescriptor, String> pEntry : this.getProperties().entrySet()) {
|
||||
PropertyDescriptor descriptor = pEntry.getKey();
|
||||
if (descriptor.getControllerServiceDefinition() != null && descriptor.isRequired()) {
|
||||
ControllerServiceNode rNode = this.processGroup.getControllerService(pEntry.getValue());
|
||||
requiredServices.add(rNode);
|
||||
requiredServices.addAll(rNode.getRequiredControllerServices());
|
||||
}
|
||||
}
|
||||
return requiredServices;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void removeReference(final ConfiguredComponent referencingComponent) {
|
||||
writeLock.lock();
|
||||
|
|
|
@ -25,14 +25,13 @@ import java.lang.reflect.Proxy;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.lifecycle.OnAdded;
|
||||
|
@ -50,7 +49,6 @@ import org.apache.nifi.controller.ScheduledState;
|
|||
import org.apache.nifi.controller.ValidationContextFactory;
|
||||
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
|
||||
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
|
||||
import org.apache.nifi.events.BulletinFactory;
|
||||
import org.apache.nifi.groups.ProcessGroup;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.nar.ExtensionManager;
|
||||
|
@ -58,7 +56,6 @@ import org.apache.nifi.nar.NarCloseable;
|
|||
import org.apache.nifi.processor.SimpleProcessLogger;
|
||||
import org.apache.nifi.processor.StandardValidationContextFactory;
|
||||
import org.apache.nifi.reporting.BulletinRepository;
|
||||
import org.apache.nifi.reporting.Severity;
|
||||
import org.apache.nifi.util.ObjectHolder;
|
||||
import org.apache.nifi.util.ReflectionUtils;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -70,7 +67,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
|||
|
||||
private final ProcessScheduler processScheduler;
|
||||
private static final Set<Method> validDisabledMethods;
|
||||
private final BulletinRepository bulletinRepo;
|
||||
// private final BulletinRepository bulletinRepo;
|
||||
private final StateManagerProvider stateManagerProvider;
|
||||
private final FlowController flowController;
|
||||
|
||||
|
@ -91,7 +88,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
|||
|
||||
this.flowController = flowController;
|
||||
this.processScheduler = scheduler;
|
||||
this.bulletinRepo = bulletinRepo;
|
||||
// this.bulletinRepo = bulletinRepo;
|
||||
this.stateManagerProvider = stateManagerProvider;
|
||||
}
|
||||
|
||||
|
@ -372,98 +369,32 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
|||
|
||||
@Override
|
||||
public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
|
||||
final Set<ControllerServiceNode> servicesToEnable = new HashSet<>();
|
||||
// Ensure that all nodes are already disabled
|
||||
for (final ControllerServiceNode serviceNode : serviceNodes) {
|
||||
final ControllerServiceState curState = serviceNode.getState();
|
||||
if (ControllerServiceState.DISABLED.equals(curState)) {
|
||||
servicesToEnable.add(serviceNode);
|
||||
} else {
|
||||
logger.warn("Cannot enable {} because it is not disabled; current state is {}", serviceNode, curState);
|
||||
}
|
||||
}
|
||||
boolean shouldStart = true;
|
||||
|
||||
// determine the order to load the services. We have to ensure that if service A references service B, then B
|
||||
// is enabled first, and so on.
|
||||
final Map<String, ControllerServiceNode> idToNodeMap = new HashMap<>();
|
||||
for (final ControllerServiceNode node : servicesToEnable) {
|
||||
idToNodeMap.put(node.getIdentifier(), node);
|
||||
}
|
||||
|
||||
// We can have many Controller Services dependent on one another. We can have many of these
|
||||
// disparate lists of Controller Services that are dependent on one another. We refer to each
|
||||
// of these as a branch.
|
||||
final List<List<ControllerServiceNode>> branches = determineEnablingOrder(idToNodeMap);
|
||||
|
||||
if (branches.isEmpty()) {
|
||||
logger.info("No Controller Services to enable");
|
||||
return;
|
||||
} else {
|
||||
logger.info("Will enable {} Controller Services", servicesToEnable.size());
|
||||
}
|
||||
|
||||
final Set<ControllerServiceNode> enabledNodes = Collections.synchronizedSet(new HashSet<ControllerServiceNode>());
|
||||
final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()), new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(final Runnable r) {
|
||||
final Thread t = Executors.defaultThreadFactory().newThread(r);
|
||||
t.setDaemon(true);
|
||||
t.setName("Enable Controller Services");
|
||||
return t;
|
||||
}
|
||||
});
|
||||
|
||||
for (final List<ControllerServiceNode> branch : branches) {
|
||||
final Runnable enableBranchRunnable = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
logger.debug("Enabling Controller Service Branch {}", branch);
|
||||
|
||||
for (final ControllerServiceNode serviceNode : branch) {
|
||||
try {
|
||||
if (!enabledNodes.contains(serviceNode)) {
|
||||
enabledNodes.add(serviceNode);
|
||||
|
||||
logger.info("Enabling {}", serviceNode);
|
||||
try {
|
||||
serviceNode.verifyCanEnable();
|
||||
processScheduler.enableControllerService(serviceNode);
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to enable " + serviceNode + " due to " + e);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.error("", e);
|
||||
}
|
||||
|
||||
if (bulletinRepo != null) {
|
||||
bulletinRepo.addBulletin(BulletinFactory.createBulletin(
|
||||
"Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// wait for service to finish enabling.
|
||||
while (ControllerServiceState.ENABLING.equals(serviceNode.getState())) {
|
||||
try {
|
||||
Thread.sleep(100L);
|
||||
} catch (final InterruptedException ie) {
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("State for {} is now {}", serviceNode, serviceNode.getState());
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to enable {} due to {}", serviceNode, e.toString());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.error("", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Iterator<ControllerServiceNode> serviceIter = serviceNodes.iterator();
|
||||
while (serviceIter.hasNext() && shouldStart) {
|
||||
ControllerServiceNode controllerServiceNode = serviceIter.next();
|
||||
List<ControllerServiceNode> requiredServices = ((StandardControllerServiceNode) controllerServiceNode).getRequiredControllerServices();
|
||||
for (ControllerServiceNode requiredService : requiredServices) {
|
||||
if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) {
|
||||
shouldStart = false;
|
||||
}
|
||||
};
|
||||
|
||||
executor.submit(enableBranchRunnable);
|
||||
}
|
||||
}
|
||||
|
||||
executor.shutdown();
|
||||
if (shouldStart) {
|
||||
List<ControllerServiceNode> services = new ArrayList<>(serviceNodes);
|
||||
Collections.sort(services, new Comparator<ControllerServiceNode>() {
|
||||
@Override
|
||||
public int compare(ControllerServiceNode s1, ControllerServiceNode s2) {
|
||||
return s2.getRequiredControllerServices().contains(s1) ? -1 : 1;
|
||||
}
|
||||
});
|
||||
|
||||
for (ControllerServiceNode controllerServiceNode : services) {
|
||||
this.enableControllerService(controllerServiceNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {
|
||||
|
|
|
@ -17,9 +17,11 @@
|
|||
package org.apache.nifi.controller.service;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.beans.PropertyDescriptor;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
@ -39,6 +41,7 @@ import org.apache.nifi.controller.service.mock.DummyProcessor;
|
|||
import org.apache.nifi.controller.service.mock.MockProcessGroup;
|
||||
import org.apache.nifi.controller.service.mock.ServiceA;
|
||||
import org.apache.nifi.controller.service.mock.ServiceB;
|
||||
import org.apache.nifi.controller.service.mock.ServiceC;
|
||||
import org.apache.nifi.groups.ProcessGroup;
|
||||
import org.apache.nifi.groups.StandardProcessGroup;
|
||||
import org.apache.nifi.processor.StandardValidationContextFactory;
|
||||
|
@ -386,4 +389,93 @@ public class TestStandardControllerServiceProvider {
|
|||
provider.unscheduleReferencingComponents(serviceNode);
|
||||
assertEquals(ScheduledState.STOPPED, procNode.getScheduledState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void validateEnableServices() {
|
||||
StandardProcessScheduler scheduler = createScheduler();
|
||||
FlowController controller = Mockito.mock(FlowController.class);
|
||||
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider);
|
||||
ProcessGroup procGroup = new MockProcessGroup();
|
||||
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
|
||||
|
||||
ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
|
||||
ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
|
||||
ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
|
||||
ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
|
||||
ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceA.class.getName(), "5", false);
|
||||
|
||||
procGroup.addControllerService(serviceNode1);
|
||||
procGroup.addControllerService(serviceNode2);
|
||||
procGroup.addControllerService(serviceNode3);
|
||||
procGroup.addControllerService(serviceNode4);
|
||||
procGroup.addControllerService(serviceNode5);
|
||||
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
|
||||
serviceNode5.setProperty(ServiceA.OTHER_SERVICE.getName(), "1");
|
||||
|
||||
provider.enableControllerServices(
|
||||
Arrays.asList(new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5}));
|
||||
|
||||
assertTrue(serviceNode1.isActive());
|
||||
assertTrue(serviceNode2.isActive());
|
||||
assertTrue(serviceNode3.isActive());
|
||||
assertTrue(serviceNode4.isActive());
|
||||
assertTrue(serviceNode5.isActive());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void validateEnableServicesWithDisabledMissingService() {
|
||||
StandardProcessScheduler scheduler = createScheduler();
|
||||
FlowController controller = Mockito.mock(FlowController.class);
|
||||
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider);
|
||||
ProcessGroup procGroup = new MockProcessGroup();
|
||||
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
|
||||
|
||||
ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
|
||||
ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
|
||||
ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
|
||||
ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
|
||||
ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceA.class.getName(), "5", false);
|
||||
ControllerServiceNode serviceNode6 = provider.createControllerService(ServiceB.class.getName(), "6", false);
|
||||
ControllerServiceNode serviceNode7 = provider.createControllerService(ServiceC.class.getName(), "7", false);
|
||||
|
||||
procGroup.addControllerService(serviceNode1);
|
||||
procGroup.addControllerService(serviceNode2);
|
||||
procGroup.addControllerService(serviceNode3);
|
||||
procGroup.addControllerService(serviceNode4);
|
||||
procGroup.addControllerService(serviceNode5);
|
||||
procGroup.addControllerService(serviceNode6);
|
||||
procGroup.addControllerService(serviceNode7);
|
||||
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
|
||||
serviceNode5.setProperty(ServiceA.OTHER_SERVICE.getName(), "6");
|
||||
serviceNode7.setProperty(ServiceC.REQ_SERVICE_1.getName(), "2");
|
||||
serviceNode7.setProperty(ServiceC.REQ_SERVICE_2.getName(), "3");
|
||||
|
||||
provider.enableControllerServices(Arrays.asList(
|
||||
new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5, serviceNode7}));
|
||||
assertFalse(serviceNode1.isActive());
|
||||
assertFalse(serviceNode2.isActive());
|
||||
assertFalse(serviceNode3.isActive());
|
||||
assertFalse(serviceNode4.isActive());
|
||||
assertFalse(serviceNode5.isActive());
|
||||
assertFalse(serviceNode6.isActive());
|
||||
|
||||
provider.enableControllerService(serviceNode6);
|
||||
provider.enableControllerServices(Arrays.asList(
|
||||
new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5 }));
|
||||
|
||||
assertTrue(serviceNode1.isActive());
|
||||
assertTrue(serviceNode2.isActive());
|
||||
assertTrue(serviceNode3.isActive());
|
||||
assertTrue(serviceNode4.isActive());
|
||||
assertTrue(serviceNode5.isActive());
|
||||
assertTrue(serviceNode6.isActive());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.controller.service.mock;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
|
||||
public class ServiceC extends AbstractControllerService {
|
||||
|
||||
public static final PropertyDescriptor REQ_SERVICE_1 = new PropertyDescriptor.Builder()
|
||||
.name("S1")
|
||||
.identifiesControllerService(ControllerService.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor REQ_SERVICE_2 = new PropertyDescriptor.Builder()
|
||||
.name("S2")
|
||||
.identifiesControllerService(ControllerService.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor OPT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("S3")
|
||||
.identifiesControllerService(ControllerService.class)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(REQ_SERVICE_1);
|
||||
descriptors.add(REQ_SERVICE_2);
|
||||
descriptors.add(OPT_SERVICE);
|
||||
return descriptors;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue