NIFI-5394: Ensure that we wait for service to be fully enabled when enabling a group, before moving on to the next in the list

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2867
This commit is contained in:
Mark Payne 2018-07-09 10:01:37 -04:00 committed by Matthew Burgess
parent 0ff8518068
commit 1e75f8c789
3 changed files with 16 additions and 5 deletions

View File

@ -336,7 +336,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
final Collection<ValidationResult> validationErrors = getValidationErrors(ignoredReferences);
if (ignoredReferences != null && !validationErrors.isEmpty()) {
throw new IllegalStateException("Controller Service with ID " + getIdentifier() + " cannot be enabled because it is not currently valid");
throw new IllegalStateException("Controller Service with ID " + getIdentifier() + " cannot be enabled because it is not currently valid: " + validationErrors);
}
}

View File

@ -33,6 +33,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -747,7 +748,16 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
for (final ControllerServiceNode nodeToEnable : recursiveReferences) {
if (!nodeToEnable.isActive()) {
logger.debug("Enabling {} because it references {}", nodeToEnable, serviceNode);
enableControllerService(nodeToEnable);
final Future<?> enableFuture = enableControllerService(nodeToEnable);
try {
enableFuture.get();
} catch (final ExecutionException ee) {
throw new IllegalStateException("Failed to enable Controller Service " + nodeToEnable, ee.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted while enabling Controller Service");
}
updated.add(nodeToEnable);
}
}

View File

@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.components.state.StateManager;
@ -90,14 +91,14 @@ public class StandardControllerServiceProviderIT {
* https://issues.apache.org/jira/browse/NIFI-1143
*/
@Test(timeout = 120000)
public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException {
public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException, ExecutionException {
final StandardProcessScheduler scheduler = new StandardProcessScheduler(new FlowEngine(1, "Unit Test", true), null, null, stateManagerProvider, niFiProperties);
for (int i = 0; i < 5000; i++) {
testEnableReferencingServicesGraph(scheduler);
}
}
public void testEnableReferencingServicesGraph(final StandardProcessScheduler scheduler) {
public void testEnableReferencingServicesGraph(final StandardProcessScheduler scheduler) throws InterruptedException, ExecutionException {
final FlowController controller = Mockito.mock(FlowController.class);
final ProcessGroup procGroup = new MockProcessGroup(controller);
Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
@ -136,7 +137,7 @@ public class StandardControllerServiceProviderIT {
setProperty(serviceNode3, ServiceA.OTHER_SERVICE.getName(), "2");
setProperty(serviceNode3, ServiceA.OTHER_SERVICE_2.getName(), "4");
provider.enableControllerService(serviceNode4);
provider.enableControllerService(serviceNode4).get();
provider.enableReferencingServices(serviceNode4);
// Verify that the services are either ENABLING or ENABLED, and wait for all of them to become ENABLED.