NIFI-3972: This closes #1855. When enabling Controller Services on startup, wait until service is enabled (or until 30 seconds elapses, whichever comes first) before returning. This avoids having Service A depend on Service B and then attempting to start Service A before Service B is fully enabled

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-05-25 12:36:18 -04:00 committed by joewitt
parent eb25c8547a
commit 36911957dc
1 changed files with 24 additions and 6 deletions

View File

@ -24,12 +24,14 @@ import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -339,6 +341,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
for (ControllerServiceNode requiredService : requiredServices) { for (ControllerServiceNode requiredService : requiredServices) {
if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) { if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) {
shouldStart = false; shouldStart = false;
logger.debug("Will not start {} because required service {} is not active and is not part of the collection of things to start", serviceNodes, requiredService);
} }
} }
} }
@ -347,7 +350,15 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
for (ControllerServiceNode controllerServiceNode : serviceNodes) { for (ControllerServiceNode controllerServiceNode : serviceNodes) {
try { try {
if (!controllerServiceNode.isActive()) { if (!controllerServiceNode.isActive()) {
this.enableControllerServiceDependenciesFirst(controllerServiceNode); final Future<Void> future = enableControllerServiceDependenciesFirst(controllerServiceNode);
try {
future.get(30, TimeUnit.SECONDS);
logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
} catch (final Exception e) {
logger.warn("Failed to enable service {}", controllerServiceNode, e);
// Nothing we can really do. Will attempt to enable this service anyway.
}
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to enable " + controllerServiceNode, e); logger.error("Failed to enable " + controllerServiceNode, e);
@ -361,26 +372,33 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
} }
private Future<Void> enableControllerServiceDependenciesFirst(ControllerServiceNode serviceNode) { private Future<Void> enableControllerServiceDependenciesFirst(ControllerServiceNode serviceNode) {
final List<Future<Void>> futures = new ArrayList<>(); final Map<ControllerServiceNode, Future<Void>> futures = new HashMap<>();
for (ControllerServiceNode depNode : serviceNode.getRequiredControllerServices()) { for (ControllerServiceNode depNode : serviceNode.getRequiredControllerServices()) {
if (!depNode.isActive()) { if (!depNode.isActive()) {
futures.add(this.enableControllerServiceDependenciesFirst(depNode)); logger.debug("Before enabling {}, will enable dependent Controller Service {}", serviceNode, depNode);
futures.put(depNode, this.enableControllerServiceDependenciesFirst(depNode));
} }
} }
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Enabling " + serviceNode); logger.debug("All dependent services for {} have now begun enabling. Will wait for them to complete", serviceNode);
} }
for (final Future<Void> future : futures) { for (final Map.Entry<ControllerServiceNode, Future<Void>> entry : futures.entrySet()) {
final ControllerServiceNode dependentService = entry.getKey();
final Future<Void> future = entry.getValue();
try { try {
future.get(); future.get(30, TimeUnit.SECONDS);
logger.debug("Successfully enabled dependent service {}; service state = {}", dependentService, dependentService.getState());
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to enable service {}, so may be unable to enable {}", dependentService, serviceNode, e);
// Nothing we can really do. Will attempt to enable this service anyway. // Nothing we can really do. Will attempt to enable this service anyway.
} }
} }
logger.debug("All dependent services have been enabled for {}; will now start service itself", serviceNode);
return this.enableControllerService(serviceNode); return this.enableControllerService(serviceNode);
} }