mirror of https://github.com/apache/nifi.git
NIFI-13755 Improved Controller Service Enabling Process (#9273)
- Updated Standard Controller Service Provider to enable requested Controller Services that do not depend on a disabled Controller Service, instead of failing to enable the entire set of requested Controller Services - Updated enabling process improves behavior when restarting NiFi and attempting to enable large numbers of Controller Services, some of which depend on disabled Services Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
56f859f37f
commit
1aabc615ce
|
@ -252,23 +252,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
|
public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodesIn) {
|
||||||
boolean shouldStart = true;
|
Collection<ControllerServiceNode> serviceNodes = new HashSet<>(serviceNodesIn);
|
||||||
|
for (ControllerServiceNode controllerServiceNode : removeControllerServicesWithUnavailableRequirements(serviceNodes)) {
|
||||||
Iterator<ControllerServiceNode> serviceIter = serviceNodes.iterator();
|
|
||||||
while (serviceIter.hasNext() && shouldStart) {
|
|
||||||
ControllerServiceNode controllerServiceNode = serviceIter.next();
|
|
||||||
List<ControllerServiceNode> requiredServices = controllerServiceNode.getRequiredControllerServices();
|
|
||||||
for (ControllerServiceNode requiredService : requiredServices) {
|
|
||||||
if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) {
|
|
||||||
shouldStart = false;
|
|
||||||
logger.debug("Will not start {} because required service {} is not active and is not part of the collection of things to start", serviceNodes, requiredService);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (shouldStart) {
|
|
||||||
for (ControllerServiceNode controllerServiceNode : serviceNodes) {
|
|
||||||
try {
|
try {
|
||||||
final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
|
final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
|
||||||
|
|
||||||
|
@ -285,6 +271,30 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Collection<ControllerServiceNode> removeControllerServicesWithUnavailableRequirements(final Collection<ControllerServiceNode> serviceNodes) {
|
||||||
|
boolean recheckNeeded;
|
||||||
|
do {
|
||||||
|
recheckNeeded = false;
|
||||||
|
for (Iterator<ControllerServiceNode> iter = serviceNodes.iterator(); iter.hasNext();) {
|
||||||
|
boolean skipStarting = false;
|
||||||
|
final ControllerServiceNode serviceNode = iter.next();
|
||||||
|
final List<ControllerServiceNode> requiredServices = serviceNode.getRequiredControllerServices();
|
||||||
|
for (ControllerServiceNode requiredService : requiredServices) {
|
||||||
|
if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) {
|
||||||
|
skipStarting = true;
|
||||||
|
logger.error("Will not start {} because its required service {} is not active and is not part of the collection of things to start", serviceNode, requiredService);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (skipStarting) {
|
||||||
|
// If any service was removed, then recheck all remaining services because the removed one might be required by another service in the list.
|
||||||
|
recheckNeeded = true;
|
||||||
|
iter.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (recheckNeeded);
|
||||||
|
|
||||||
|
return serviceNodes;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -37,9 +37,15 @@ import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class StandardControllerServiceProviderTest {
|
public class StandardControllerServiceProviderTest {
|
||||||
|
|
||||||
|
@ -69,7 +75,7 @@ public class StandardControllerServiceProviderTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final ControllerServiceProvider serviceProvider) {
|
private ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final ControllerServiceProvider serviceProvider) {
|
||||||
final ControllerServiceNode serviceNode = new ExtensionBuilder()
|
return new ExtensionBuilder()
|
||||||
.identifier(id)
|
.identifier(id)
|
||||||
.type(type)
|
.type(type)
|
||||||
.bundleCoordinate(bundleCoordinate)
|
.bundleCoordinate(bundleCoordinate)
|
||||||
|
@ -81,8 +87,6 @@ public class StandardControllerServiceProviderTest {
|
||||||
.stateManagerProvider(Mockito.mock(StateManagerProvider.class))
|
.stateManagerProvider(Mockito.mock(StateManagerProvider.class))
|
||||||
.extensionManager(extensionManager)
|
.extensionManager(extensionManager)
|
||||||
.buildControllerService();
|
.buildControllerService();
|
||||||
|
|
||||||
return serviceNode;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -106,4 +110,61 @@ public class StandardControllerServiceProviderTest {
|
||||||
public void testCallImplementationInitialized() throws InitializationException {
|
public void testCallImplementationInitialized() throws InitializationException {
|
||||||
implementation.initialize(null);
|
implementation.initialize(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ControllerServiceNode populateControllerService(ControllerServiceNode requiredService) { // Collection<ControllerServiceNode> serviceNodes) {
|
||||||
|
ControllerServiceNode controllerServiceNode = mock(ControllerServiceNode.class);
|
||||||
|
ArrayList<ControllerServiceNode> requiredServices = new ArrayList<>();
|
||||||
|
if (requiredService != null) {
|
||||||
|
requiredServices.add(requiredService);
|
||||||
|
}
|
||||||
|
when(controllerServiceNode.getRequiredControllerServices()).thenReturn(requiredServices);
|
||||||
|
return controllerServiceNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEnableControllerServicesAllAreEnabled() {
|
||||||
|
final CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
future.complete(null);
|
||||||
|
|
||||||
|
ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
|
||||||
|
when(scheduler.enableControllerService(any())).thenReturn(future);
|
||||||
|
ControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, Mockito.mock(FlowManager.class), Mockito.mock(ExtensionManager.class));
|
||||||
|
|
||||||
|
final ArrayList<ControllerServiceNode> serviceNodes = new ArrayList<>();
|
||||||
|
serviceNodes.add(populateControllerService(null));
|
||||||
|
serviceNodes.add(populateControllerService(null));
|
||||||
|
provider.enableControllerServices(serviceNodes);
|
||||||
|
verify(scheduler).enableControllerService(serviceNodes.get(0));
|
||||||
|
verify(scheduler).enableControllerService(serviceNodes.get(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEnableControllerServicesSomeAreEnabled() {
|
||||||
|
final CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
future.complete(null);
|
||||||
|
|
||||||
|
ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
|
||||||
|
when(scheduler.enableControllerService(any())).thenReturn(future);
|
||||||
|
ControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null, Mockito.mock(FlowManager.class), Mockito.mock(ExtensionManager.class));
|
||||||
|
|
||||||
|
final ArrayList<ControllerServiceNode> serviceNodes = new ArrayList<>();
|
||||||
|
ControllerServiceNode disabledController = populateControllerService(null);
|
||||||
|
// Do not start because disabledController is not in the serviceNodes (list of services to start)
|
||||||
|
serviceNodes.add(populateControllerService(disabledController));
|
||||||
|
// Start this service because it has no required services
|
||||||
|
serviceNodes.add(populateControllerService(null));
|
||||||
|
// Do not start because its required service is not started because the required service
|
||||||
|
// depends on disabledController which is not in the serviceNodes (list of services to start)
|
||||||
|
serviceNodes.add(populateControllerService(serviceNodes.get(0)));
|
||||||
|
// Do not start because its required service depends on disabledController through 2 other levels of services
|
||||||
|
serviceNodes.add(populateControllerService(serviceNodes.get(2)));
|
||||||
|
// Start this service because it has a required service which is in the list of services to start
|
||||||
|
serviceNodes.add(populateControllerService(serviceNodes.get(1)));
|
||||||
|
provider.enableControllerServices(serviceNodes);
|
||||||
|
verify(scheduler, Mockito.times(0)).enableControllerService(serviceNodes.get(0));
|
||||||
|
verify(scheduler, Mockito.times(2)).enableControllerService(serviceNodes.get(1));
|
||||||
|
verify(scheduler, Mockito.times(0)).enableControllerService(serviceNodes.get(2));
|
||||||
|
verify(scheduler, Mockito.times(0)).enableControllerService(serviceNodes.get(3));
|
||||||
|
verify(scheduler, Mockito.times(1)).enableControllerService(serviceNodes.get(4));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -576,10 +576,10 @@ public class TestStandardControllerServiceProvider {
|
||||||
allBut6.stream().forEach(ControllerServiceNode::performValidation);
|
allBut6.stream().forEach(ControllerServiceNode::performValidation);
|
||||||
|
|
||||||
provider.enableControllerServices(allBut6);
|
provider.enableControllerServices(allBut6);
|
||||||
assertFalse(serviceNode1.isActive());
|
assertTrue(serviceNode1.isActive());
|
||||||
assertFalse(serviceNode2.isActive());
|
assertTrue(serviceNode2.isActive());
|
||||||
assertFalse(serviceNode3.isActive());
|
assertTrue(serviceNode3.isActive());
|
||||||
assertFalse(serviceNode4.isActive());
|
assertTrue(serviceNode4.isActive());
|
||||||
assertFalse(serviceNode5.isActive());
|
assertFalse(serviceNode5.isActive());
|
||||||
assertFalse(serviceNode6.isActive());
|
assertFalse(serviceNode6.isActive());
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue