NIFI-5012: When connecting to cluster, esure that controller services appropriately enabled/disabled

This closes #2579.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2018-03-23 15:44:06 -04:00 committed by Bryan Bende
parent 3612fbe522
commit 456c9c8fc0
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
1 changed files with 44 additions and 0 deletions

View File

@ -73,6 +73,7 @@ import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.controller.serialization.StandardFlowSerializer;
import org.apache.nifi.controller.service.ControllerServiceLoader; import org.apache.nifi.controller.service.ControllerServiceLoader;
import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.fingerprint.FingerprintException; import org.apache.nifi.fingerprint.FingerprintException;
@ -749,6 +750,19 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
} }
} }
private ControllerServiceState getFinalTransitionState(final ControllerServiceState state) {
switch (state) {
case DISABLED:
case DISABLING:
return ControllerServiceState.DISABLED;
case ENABLED:
case ENABLING:
return ControllerServiceState.ENABLED;
default:
throw new AssertionError();
}
}
private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement,
final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException { final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException {
@ -779,6 +793,36 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
// get the real process group and ID // get the real process group and ID
final ProcessGroup processGroup = controller.getGroup(processGroupDto.getId()); final ProcessGroup processGroup = controller.getGroup(processGroupDto.getId());
// determine the scheduled state of all of the Controller Service
final List<Element> controllerServiceNodeList = getChildrenByTagName(processGroupElement, "controllerService");
final Set<ControllerServiceNode> toDisable = new HashSet<>();
final Set<ControllerServiceNode> toEnable = new HashSet<>();
for (final Element serviceElement : controllerServiceNodeList) {
final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(serviceElement, encryptor);
final ControllerServiceNode serviceNode = processGroup.getControllerService(dto.getId());
// Check if the controller service is in the correct state. We consider it the correct state if
// we are in a transitional state and heading in the right direction or already in the correct state.
// E.g., it is the correct state if it should be 'DISABLED' and it is either DISABLED or DISABLING.
final ControllerServiceState serviceState = getFinalTransitionState(serviceNode.getState());
final ControllerServiceState clusterState = getFinalTransitionState(ControllerServiceState.valueOf(dto.getState()));
if (serviceState != clusterState) {
switch (clusterState) {
case DISABLED:
toDisable.add(serviceNode);
break;
case ENABLED:
toEnable.add(serviceNode);
break;
}
}
}
controller.disableControllerServicesAsync(toDisable);
controller.enableControllerServices(toEnable);
// processors & ports cannot be updated - they must be the same. Except for the scheduled state. // processors & ports cannot be updated - they must be the same. Except for the scheduled state.
final List<Element> processorNodeList = getChildrenByTagName(processGroupElement, "processor"); final List<Element> processorNodeList = getChildrenByTagName(processGroupElement, "processor");
for (final Element processorElement : processorNodeList) { for (final Element processorElement : processorNodeList) {