mirror of https://github.com/apache/nifi.git
parent
6edfa634d4
commit
2ed1ab7630
|
@ -22,8 +22,8 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -170,7 +170,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ControllerServiceNode> getRequiredControllerServices() {
|
public List<ControllerServiceNode> getRequiredControllerServices() {
|
||||||
List<ControllerServiceNode> requiredServices = new ArrayList<>();
|
Set<ControllerServiceNode> requiredServices = new HashSet<>();
|
||||||
for (Entry<PropertyDescriptor, String> pEntry : this.getProperties().entrySet()) {
|
for (Entry<PropertyDescriptor, String> pEntry : this.getProperties().entrySet()) {
|
||||||
PropertyDescriptor descriptor = pEntry.getKey();
|
PropertyDescriptor descriptor = pEntry.getKey();
|
||||||
if (descriptor.getControllerServiceDefinition() != null && descriptor.isRequired()) {
|
if (descriptor.getControllerServiceDefinition() != null && descriptor.isRequired()) {
|
||||||
|
@ -179,7 +179,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
||||||
requiredServices.addAll(rNode.getRequiredControllerServices());
|
requiredServices.addAll(rNode.getRequiredControllerServices());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return requiredServices;
|
return new ArrayList<>(requiredServices);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,6 @@ import java.lang.reflect.Proxy;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -55,7 +54,6 @@ import org.apache.nifi.nar.ExtensionManager;
|
||||||
import org.apache.nifi.nar.NarCloseable;
|
import org.apache.nifi.nar.NarCloseable;
|
||||||
import org.apache.nifi.processor.SimpleProcessLogger;
|
import org.apache.nifi.processor.SimpleProcessLogger;
|
||||||
import org.apache.nifi.processor.StandardValidationContextFactory;
|
import org.apache.nifi.processor.StandardValidationContextFactory;
|
||||||
|
|
||||||
import org.apache.nifi.reporting.BulletinRepository;
|
import org.apache.nifi.reporting.BulletinRepository;
|
||||||
import org.apache.nifi.reporting.Severity;
|
import org.apache.nifi.reporting.Severity;
|
||||||
import org.apache.nifi.util.ObjectHolder;
|
import org.apache.nifi.util.ObjectHolder;
|
||||||
|
@ -385,17 +383,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
}
|
}
|
||||||
|
|
||||||
if (shouldStart) {
|
if (shouldStart) {
|
||||||
List<ControllerServiceNode> services = new ArrayList<>(serviceNodes);
|
for (ControllerServiceNode controllerServiceNode : serviceNodes) {
|
||||||
Collections.sort(services, new Comparator<ControllerServiceNode>() {
|
|
||||||
@Override
|
|
||||||
public int compare(ControllerServiceNode s1, ControllerServiceNode s2) {
|
|
||||||
return s2.getRequiredControllerServices().contains(s1) ? -1 : 1;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
for (ControllerServiceNode controllerServiceNode : services) {
|
|
||||||
try {
|
try {
|
||||||
this.enableControllerService(controllerServiceNode);
|
if (!controllerServiceNode.isActive()) {
|
||||||
|
this.enableControllerServiceDependenciesFirst(controllerServiceNode);
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Failed to enable " + controllerServiceNode + " due to " + e);
|
logger.error("Failed to enable " + controllerServiceNode + " due to " + e);
|
||||||
if (this.bulletinRepo != null) {
|
if (this.bulletinRepo != null) {
|
||||||
|
@ -407,6 +399,18 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void enableControllerServiceDependenciesFirst(ControllerServiceNode serviceNode) {
|
||||||
|
for (ControllerServiceNode depNode : serviceNode.getRequiredControllerServices()) {
|
||||||
|
if (!depNode.isActive()) {
|
||||||
|
this.enableControllerServiceDependenciesFirst(depNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Enabling " + serviceNode);
|
||||||
|
}
|
||||||
|
this.enableControllerService(serviceNode);
|
||||||
|
}
|
||||||
|
|
||||||
static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {
|
static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {
|
||||||
final List<List<ControllerServiceNode>> orderedNodeLists = new ArrayList<>();
|
final List<List<ControllerServiceNode>> orderedNodeLists = new ArrayList<>();
|
||||||
|
|
||||||
|
|
|
@ -398,32 +398,73 @@ public class TestStandardControllerServiceProvider {
|
||||||
ProcessGroup procGroup = new MockProcessGroup();
|
ProcessGroup procGroup = new MockProcessGroup();
|
||||||
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
|
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
|
||||||
|
|
||||||
ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
|
ControllerServiceNode A = provider.createControllerService(ServiceA.class.getName(), "A", false);
|
||||||
ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
|
ControllerServiceNode B = provider.createControllerService(ServiceA.class.getName(), "B", false);
|
||||||
ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
|
ControllerServiceNode C = provider.createControllerService(ServiceA.class.getName(), "C", false);
|
||||||
ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
|
ControllerServiceNode D = provider.createControllerService(ServiceB.class.getName(), "D", false);
|
||||||
ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceA.class.getName(), "5", false);
|
ControllerServiceNode E = provider.createControllerService(ServiceA.class.getName(), "E", false);
|
||||||
|
|
||||||
procGroup.addControllerService(serviceNode1);
|
procGroup.addControllerService(A);
|
||||||
procGroup.addControllerService(serviceNode2);
|
procGroup.addControllerService(B);
|
||||||
procGroup.addControllerService(serviceNode3);
|
procGroup.addControllerService(C);
|
||||||
procGroup.addControllerService(serviceNode4);
|
procGroup.addControllerService(D);
|
||||||
procGroup.addControllerService(serviceNode5);
|
procGroup.addControllerService(E);
|
||||||
|
|
||||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
A.setProperty(ServiceA.OTHER_SERVICE.getName(), "B");
|
||||||
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
|
B.setProperty(ServiceA.OTHER_SERVICE.getName(), "D");
|
||||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
C.setProperty(ServiceA.OTHER_SERVICE.getName(), "B");
|
||||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
|
C.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "D");
|
||||||
serviceNode5.setProperty(ServiceA.OTHER_SERVICE.getName(), "1");
|
E.setProperty(ServiceA.OTHER_SERVICE.getName(), "A");
|
||||||
|
|
||||||
provider.enableControllerServices(
|
provider.enableControllerServices(Arrays.asList(new ControllerServiceNode[] { A, B, C, D, E }));
|
||||||
Arrays.asList(new ControllerServiceNode[] { serviceNode1, serviceNode2, serviceNode3, serviceNode4, serviceNode5}));
|
|
||||||
|
|
||||||
assertTrue(serviceNode1.isActive());
|
assertTrue(A.isActive());
|
||||||
assertTrue(serviceNode2.isActive());
|
assertTrue(B.isActive());
|
||||||
assertTrue(serviceNode3.isActive());
|
assertTrue(C.isActive());
|
||||||
assertTrue(serviceNode4.isActive());
|
assertTrue(D.isActive());
|
||||||
assertTrue(serviceNode5.isActive());
|
assertTrue(E.isActive());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test is similar to the above, but different combination of service
|
||||||
|
* dependencies
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void validateEnableServices2() {
|
||||||
|
StandardProcessScheduler scheduler = createScheduler();
|
||||||
|
FlowController controller = Mockito.mock(FlowController.class);
|
||||||
|
StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null,
|
||||||
|
stateManagerProvider);
|
||||||
|
ProcessGroup procGroup = new MockProcessGroup();
|
||||||
|
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
|
||||||
|
|
||||||
|
ControllerServiceNode A = provider.createControllerService(ServiceC.class.getName(), "A", false);
|
||||||
|
ControllerServiceNode B = provider.createControllerService(ServiceA.class.getName(), "B", false);
|
||||||
|
ControllerServiceNode C = provider.createControllerService(ServiceB.class.getName(), "C", false);
|
||||||
|
ControllerServiceNode D = provider.createControllerService(ServiceA.class.getName(), "D", false);
|
||||||
|
ControllerServiceNode F = provider.createControllerService(ServiceA.class.getName(), "F", false);
|
||||||
|
|
||||||
|
procGroup.addControllerService(A);
|
||||||
|
procGroup.addControllerService(B);
|
||||||
|
procGroup.addControllerService(C);
|
||||||
|
procGroup.addControllerService(D);
|
||||||
|
procGroup.addControllerService(F);
|
||||||
|
|
||||||
|
A.setProperty(ServiceC.REQ_SERVICE_1.getName(), "B");
|
||||||
|
A.setProperty(ServiceC.REQ_SERVICE_2.getName(), "D");
|
||||||
|
B.setProperty(ServiceA.OTHER_SERVICE.getName(), "C");
|
||||||
|
|
||||||
|
F.setProperty(ServiceA.OTHER_SERVICE.getName(), "D");
|
||||||
|
D.setProperty(ServiceA.OTHER_SERVICE.getName(), "C");
|
||||||
|
|
||||||
|
provider.enableControllerServices(Arrays.asList(new ControllerServiceNode[] { C, F, A, B, D }));
|
||||||
|
|
||||||
|
assertTrue(A.isActive());
|
||||||
|
assertTrue(B.isActive());
|
||||||
|
assertTrue(C.isActive());
|
||||||
|
assertTrue(D.isActive());
|
||||||
|
assertTrue(F.isActive());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue