mirror of https://github.com/apache/nifi.git
NIFI-2032 fixed 'enableControllerServices' logic
added getRequiredControllerServices() operation to ControllerServiceNode
This commit is contained in:
parent
c955ec1689
commit
6402eb9315
|
@ -20,6 +20,7 @@ import org.apache.nifi.controller.ConfiguredComponent;
|
||||||
import org.apache.nifi.controller.ControllerService;
|
import org.apache.nifi.controller.ControllerService;
|
||||||
import org.apache.nifi.groups.ProcessGroup;
|
import org.apache.nifi.groups.ProcessGroup;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
|
||||||
|
@ -50,6 +51,16 @@ public interface ControllerServiceNode extends ConfiguredComponent {
|
||||||
*/
|
*/
|
||||||
ControllerService getProxiedControllerService();
|
ControllerService getProxiedControllerService();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the list of services that are required to be enabled before this
|
||||||
|
* service is enabled. The returned list is flattened and contains both
|
||||||
|
* immediate and transient dependencies.
|
||||||
|
*
|
||||||
|
* @return list of services required to be enabled before this service is
|
||||||
|
* enabled
|
||||||
|
*/
|
||||||
|
List<ControllerServiceNode> getRequiredControllerServices();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* Returns the actual implementation of the Controller Service that this ControllerServiceNode
|
* Returns the actual implementation of the Controller Service that this ControllerServiceNode
|
||||||
|
|
|
@ -17,10 +17,13 @@
|
||||||
package org.apache.nifi.controller.service;
|
package org.apache.nifi.controller.service;
|
||||||
|
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -35,6 +38,7 @@ import org.apache.nifi.authorization.Resource;
|
||||||
import org.apache.nifi.authorization.resource.Authorizable;
|
import org.apache.nifi.authorization.resource.Authorizable;
|
||||||
import org.apache.nifi.authorization.resource.ResourceFactory;
|
import org.apache.nifi.authorization.resource.ResourceFactory;
|
||||||
import org.apache.nifi.authorization.resource.ResourceType;
|
import org.apache.nifi.authorization.resource.ResourceType;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.ValidationResult;
|
import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.controller.AbstractConfiguredComponent;
|
import org.apache.nifi.controller.AbstractConfiguredComponent;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
|
@ -164,6 +168,21 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ControllerServiceNode> getRequiredControllerServices() {
|
||||||
|
List<ControllerServiceNode> requiredServices = new ArrayList<>();
|
||||||
|
for (Entry<PropertyDescriptor, String> pEntry : this.getProperties().entrySet()) {
|
||||||
|
PropertyDescriptor descriptor = pEntry.getKey();
|
||||||
|
if (descriptor.getControllerServiceDefinition() != null && descriptor.isRequired()) {
|
||||||
|
ControllerServiceNode rNode = this.processGroup.getControllerService(pEntry.getValue());
|
||||||
|
requiredServices.add(rNode);
|
||||||
|
requiredServices.addAll(rNode.getRequiredControllerServices());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return requiredServices;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeReference(final ConfiguredComponent referencingComponent) {
|
public void removeReference(final ConfiguredComponent referencingComponent) {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
|
|
|
@ -25,14 +25,13 @@ import java.lang.reflect.Proxy;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.Comparator;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnAdded;
|
import org.apache.nifi.annotation.lifecycle.OnAdded;
|
||||||
|
@ -50,7 +49,6 @@ import org.apache.nifi.controller.ScheduledState;
|
||||||
import org.apache.nifi.controller.ValidationContextFactory;
|
import org.apache.nifi.controller.ValidationContextFactory;
|
||||||
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
|
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
|
||||||
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
|
import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
|
||||||
import org.apache.nifi.events.BulletinFactory;
|
|
||||||
import org.apache.nifi.groups.ProcessGroup;
|
import org.apache.nifi.groups.ProcessGroup;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.nar.ExtensionManager;
|
import org.apache.nifi.nar.ExtensionManager;
|
||||||
|
@ -58,7 +56,6 @@ import org.apache.nifi.nar.NarCloseable;
|
||||||
import org.apache.nifi.processor.SimpleProcessLogger;
|
import org.apache.nifi.processor.SimpleProcessLogger;
|
||||||
import org.apache.nifi.processor.StandardValidationContextFactory;
|
import org.apache.nifi.processor.StandardValidationContextFactory;
|
||||||
import org.apache.nifi.reporting.BulletinRepository;
|
import org.apache.nifi.reporting.BulletinRepository;
|
||||||
import org.apache.nifi.reporting.Severity;
|
|
||||||
import org.apache.nifi.util.ObjectHolder;
|
import org.apache.nifi.util.ObjectHolder;
|
||||||
import org.apache.nifi.util.ReflectionUtils;
|
import org.apache.nifi.util.ReflectionUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -70,7 +67,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
|
|
||||||
private final ProcessScheduler processScheduler;
|
private final ProcessScheduler processScheduler;
|
||||||
private static final Set<Method> validDisabledMethods;
|
private static final Set<Method> validDisabledMethods;
|
||||||
private final BulletinRepository bulletinRepo;
|
// private final BulletinRepository bulletinRepo;
|
||||||
private final StateManagerProvider stateManagerProvider;
|
private final StateManagerProvider stateManagerProvider;
|
||||||
private final FlowController flowController;
|
private final FlowController flowController;
|
||||||
|
|
||||||
|
@ -91,7 +88,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
|
|
||||||
this.flowController = flowController;
|
this.flowController = flowController;
|
||||||
this.processScheduler = scheduler;
|
this.processScheduler = scheduler;
|
||||||
this.bulletinRepo = bulletinRepo;
|
// this.bulletinRepo = bulletinRepo;
|
||||||
this.stateManagerProvider = stateManagerProvider;
|
this.stateManagerProvider = stateManagerProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -372,98 +369,32 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
|
public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
|
||||||
final Set<ControllerServiceNode> servicesToEnable = new HashSet<>();
|
boolean shouldStart = true;
|
||||||
// Ensure that all nodes are already disabled
|
|
||||||
for (final ControllerServiceNode serviceNode : serviceNodes) {
|
|
||||||
final ControllerServiceState curState = serviceNode.getState();
|
|
||||||
if (ControllerServiceState.DISABLED.equals(curState)) {
|
|
||||||
servicesToEnable.add(serviceNode);
|
|
||||||
} else {
|
|
||||||
logger.warn("Cannot enable {} because it is not disabled; current state is {}", serviceNode, curState);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// determine the order to load the services. We have to ensure that if service A references service B, then B
|
Iterator<ControllerServiceNode> serviceIter = serviceNodes.iterator();
|
||||||
// is enabled first, and so on.
|
while (serviceIter.hasNext() && shouldStart) {
|
||||||
final Map<String, ControllerServiceNode> idToNodeMap = new HashMap<>();
|
ControllerServiceNode controllerServiceNode = serviceIter.next();
|
||||||
for (final ControllerServiceNode node : servicesToEnable) {
|
List<ControllerServiceNode> requiredServices = ((StandardControllerServiceNode) controllerServiceNode).getRequiredControllerServices();
|
||||||
idToNodeMap.put(node.getIdentifier(), node);
|
for (ControllerServiceNode requiredService : requiredServices) {
|
||||||
}
|
if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) {
|
||||||
|
shouldStart = false;
|
||||||
// We can have many Controller Services dependent on one another. We can have many of these
|
|
||||||
// disparate lists of Controller Services that are dependent on one another. We refer to each
|
|
||||||
// of these as a branch.
|
|
||||||
final List<List<ControllerServiceNode>> branches = determineEnablingOrder(idToNodeMap);
|
|
||||||
|
|
||||||
if (branches.isEmpty()) {
|
|
||||||
logger.info("No Controller Services to enable");
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
logger.info("Will enable {} Controller Services", servicesToEnable.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
final Set<ControllerServiceNode> enabledNodes = Collections.synchronizedSet(new HashSet<ControllerServiceNode>());
|
|
||||||
final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()), new ThreadFactory() {
|
|
||||||
@Override
|
|
||||||
public Thread newThread(final Runnable r) {
|
|
||||||
final Thread t = Executors.defaultThreadFactory().newThread(r);
|
|
||||||
t.setDaemon(true);
|
|
||||||
t.setName("Enable Controller Services");
|
|
||||||
return t;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
for (final List<ControllerServiceNode> branch : branches) {
|
|
||||||
final Runnable enableBranchRunnable = new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
logger.debug("Enabling Controller Service Branch {}", branch);
|
|
||||||
|
|
||||||
for (final ControllerServiceNode serviceNode : branch) {
|
|
||||||
try {
|
|
||||||
if (!enabledNodes.contains(serviceNode)) {
|
|
||||||
enabledNodes.add(serviceNode);
|
|
||||||
|
|
||||||
logger.info("Enabling {}", serviceNode);
|
|
||||||
try {
|
|
||||||
serviceNode.verifyCanEnable();
|
|
||||||
processScheduler.enableControllerService(serviceNode);
|
|
||||||
} catch (final Exception e) {
|
|
||||||
logger.error("Failed to enable " + serviceNode + " due to " + e);
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.error("", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (bulletinRepo != null) {
|
|
||||||
bulletinRepo.addBulletin(BulletinFactory.createBulletin(
|
|
||||||
"Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for service to finish enabling.
|
|
||||||
while (ControllerServiceState.ENABLING.equals(serviceNode.getState())) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(100L);
|
|
||||||
} catch (final InterruptedException ie) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info("State for {} is now {}", serviceNode, serviceNode.getState());
|
|
||||||
} catch (final Exception e) {
|
|
||||||
logger.error("Failed to enable {} due to {}", serviceNode, e.toString());
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.error("", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
|
||||||
executor.submit(enableBranchRunnable);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
executor.shutdown();
|
if (shouldStart) {
|
||||||
|
List<ControllerServiceNode> services = new ArrayList<>(serviceNodes);
|
||||||
|
Collections.sort(services, new Comparator<ControllerServiceNode>() {
|
||||||
|
@Override
|
||||||
|
public int compare(ControllerServiceNode s1, ControllerServiceNode s2) {
|
||||||
|
return s2.getRequiredControllerServices().contains(s1) ? -1 : 1;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for (ControllerServiceNode controllerServiceNode : services) {
|
||||||
|
this.enableControllerService(controllerServiceNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {
|
static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {
|
||||||
|
|
|
@ -17,9 +17,11 @@
|
||||||
package org.apache.nifi.controller.service;
|
package org.apache.nifi.controller.service;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.beans.PropertyDescriptor;
|
import java.beans.PropertyDescriptor;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -39,6 +41,7 @@ import org.apache.nifi.controller.service.mock.DummyProcessor;
|
||||||
import org.apache.nifi.controller.service.mock.MockProcessGroup;
|
import org.apache.nifi.controller.service.mock.MockProcessGroup;
|
||||||
import org.apache.nifi.controller.service.mock.ServiceA;
|
import org.apache.nifi.controller.service.mock.ServiceA;
|
||||||
import org.apache.nifi.controller.service.mock.ServiceB;
|
import org.apache.nifi.controller.service.mock.ServiceB;
|
||||||
|
import org.apache.nifi.controller.service.mock.ServiceC;
|
||||||
import org.apache.nifi.groups.ProcessGroup;
|
import org.apache.nifi.groups.ProcessGroup;
|
||||||
import org.apache.nifi.groups.StandardProcessGroup;
|
import org.apache.nifi.groups.StandardProcessGroup;
|
||||||
import org.apache.nifi.processor.StandardValidationContextFactory;
|
import org.apache.nifi.processor.StandardValidationContextFactory;
|
||||||
|
@ -386,4 +389,93 @@ public class TestStandardControllerServiceProvider {
|
||||||
provider.unscheduleReferencingComponents(serviceNode);
|
provider.unscheduleReferencingComponents(serviceNode);
|
||||||
assertEquals(ScheduledState.STOPPED, procNode.getScheduledState());
|
assertEquals(ScheduledState.STOPPED, procNode.getScheduledState());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void validateEnableServices() {
|
||||||
|
StandardProcessScheduler scheduler = createScheduler();
|
||||||
|
FlowController controller = Mockito.mock(FlowController.class);
|
||||||
|
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider);
|
||||||
|
ProcessGroup procGroup = new MockProcessGroup();
|
||||||
|
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
|
||||||
|
|
||||||
|
ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
|
||||||
|
ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
|
||||||
|
ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
|
||||||
|
ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
|
||||||
|
ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceA.class.getName(), "5", false);
|
||||||
|
|
||||||
|
procGroup.addControllerService(serviceNode1);
|
||||||
|
procGroup.addControllerService(serviceNode2);
|
||||||
|
procGroup.addControllerService(serviceNode3);
|
||||||
|
procGroup.addControllerService(serviceNode4);
|
||||||
|
procGroup.addControllerService(serviceNode5);
|
||||||
|
|
||||||
|
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||||
|
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
|
||||||
|
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||||
|
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
|
||||||
|
serviceNode5.setProperty(ServiceA.OTHER_SERVICE.getName(), "1");
|
||||||
|
|
||||||
|
provider.enableControllerServices(
|
||||||
|
Arrays.asList(new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5}));
|
||||||
|
|
||||||
|
assertTrue(serviceNode1.isActive());
|
||||||
|
assertTrue(serviceNode2.isActive());
|
||||||
|
assertTrue(serviceNode3.isActive());
|
||||||
|
assertTrue(serviceNode4.isActive());
|
||||||
|
assertTrue(serviceNode5.isActive());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void validateEnableServicesWithDisabledMissingService() {
|
||||||
|
StandardProcessScheduler scheduler = createScheduler();
|
||||||
|
FlowController controller = Mockito.mock(FlowController.class);
|
||||||
|
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider);
|
||||||
|
ProcessGroup procGroup = new MockProcessGroup();
|
||||||
|
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
|
||||||
|
|
||||||
|
ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
|
||||||
|
ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
|
||||||
|
ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
|
||||||
|
ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
|
||||||
|
ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceA.class.getName(), "5", false);
|
||||||
|
ControllerServiceNode serviceNode6 = provider.createControllerService(ServiceB.class.getName(), "6", false);
|
||||||
|
ControllerServiceNode serviceNode7 = provider.createControllerService(ServiceC.class.getName(), "7", false);
|
||||||
|
|
||||||
|
procGroup.addControllerService(serviceNode1);
|
||||||
|
procGroup.addControllerService(serviceNode2);
|
||||||
|
procGroup.addControllerService(serviceNode3);
|
||||||
|
procGroup.addControllerService(serviceNode4);
|
||||||
|
procGroup.addControllerService(serviceNode5);
|
||||||
|
procGroup.addControllerService(serviceNode6);
|
||||||
|
procGroup.addControllerService(serviceNode7);
|
||||||
|
|
||||||
|
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||||
|
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
|
||||||
|
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||||
|
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
|
||||||
|
serviceNode5.setProperty(ServiceA.OTHER_SERVICE.getName(), "6");
|
||||||
|
serviceNode7.setProperty(ServiceC.REQ_SERVICE_1.getName(), "2");
|
||||||
|
serviceNode7.setProperty(ServiceC.REQ_SERVICE_2.getName(), "3");
|
||||||
|
|
||||||
|
provider.enableControllerServices(Arrays.asList(
|
||||||
|
new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5, serviceNode7}));
|
||||||
|
assertFalse(serviceNode1.isActive());
|
||||||
|
assertFalse(serviceNode2.isActive());
|
||||||
|
assertFalse(serviceNode3.isActive());
|
||||||
|
assertFalse(serviceNode4.isActive());
|
||||||
|
assertFalse(serviceNode5.isActive());
|
||||||
|
assertFalse(serviceNode6.isActive());
|
||||||
|
|
||||||
|
provider.enableControllerService(serviceNode6);
|
||||||
|
provider.enableControllerServices(Arrays.asList(
|
||||||
|
new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5 }));
|
||||||
|
|
||||||
|
assertTrue(serviceNode1.isActive());
|
||||||
|
assertTrue(serviceNode2.isActive());
|
||||||
|
assertTrue(serviceNode3.isActive());
|
||||||
|
assertTrue(serviceNode4.isActive());
|
||||||
|
assertTrue(serviceNode5.isActive());
|
||||||
|
assertTrue(serviceNode6.isActive());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.controller.service.mock;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
|
import org.apache.nifi.controller.ControllerService;
|
||||||
|
|
||||||
|
public class ServiceC extends AbstractControllerService {
|
||||||
|
|
||||||
|
public static final PropertyDescriptor REQ_SERVICE_1 = new PropertyDescriptor.Builder()
|
||||||
|
.name("S1")
|
||||||
|
.identifiesControllerService(ControllerService.class)
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor REQ_SERVICE_2 = new PropertyDescriptor.Builder()
|
||||||
|
.name("S2")
|
||||||
|
.identifiesControllerService(ControllerService.class)
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor OPT_SERVICE = new PropertyDescriptor.Builder()
|
||||||
|
.name("S3")
|
||||||
|
.identifiesControllerService(ControllerService.class)
|
||||||
|
.required(false)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||||
|
descriptors.add(REQ_SERVICE_1);
|
||||||
|
descriptors.add(REQ_SERVICE_2);
|
||||||
|
descriptors.add(OPT_SERVICE);
|
||||||
|
return descriptors;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue