From 41c0f19e48cab615415be8a3ffbb6e467815f630 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 18 Jul 2016 10:50:39 -0400 Subject: [PATCH] NIFI-2305: Do not run processors that are marked as Primary Node Only if disconnected from cluster This closes #667 Signed-off-by: jpercivall --- .../org/apache/nifi/controller/FlowController.java | 11 ++++++++--- .../controller/tasks/ContinuallyRunProcessorTask.java | 6 +++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index ac004e1fda..4670605a54 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3044,7 +3044,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @throws IllegalStateException if not configured for clustering */ public void startHeartbeating() throws IllegalStateException { - if (!configuredForClustering) { + if (!isConfiguredForClustering()) { throw new IllegalStateException("Unable to start heartbeating because heartbeating is not configured."); } @@ -3082,7 +3082,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @throws IllegalStateException if not clustered */ public void stopHeartbeating() throws IllegalStateException { - if (!configuredForClustering) { + if (!isConfiguredForClustering()) { throw new IllegalStateException("Unable to stop heartbeating because heartbeating is not configured."); } @@ -3156,6 +3156,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + public boolean isConfiguredForClustering() { + return configuredForClustering; + } + /** * @return the DN of the Cluster Manager that we are currently connected to, if available. This will return null if the instance is not clustered or if the instance is clustered but the NCM's DN * is not available - for instance, if cluster communications are not secure @@ -3324,7 +3328,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R */ @Override public boolean isPrimary() { - return leaderElectionManager != null && leaderElectionManager.isLeader(ClusterRoles.PRIMARY_NODE); + return isClustered() && leaderElectionManager != null && leaderElectionManager.isLeader(ClusterRoles.PRIMARY_NODE); } public void setPrimary(final boolean primary) { @@ -3914,6 +3918,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, groupId); } + @Override public ProvenanceRepository getProvenanceRepository() { return provenanceRepository; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java index b82cb3a05d..6d65604751 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java @@ -70,8 +70,8 @@ public class ContinuallyRunProcessorTask implements Callable { this.processContext = processContext; } - static boolean isRunOnCluster(final ProcessorNode procNode, final boolean isClustered, final boolean isPrimary) { - return !procNode.isIsolated() || !isClustered || isPrimary; + static boolean isRunOnCluster(final ProcessorNode procNode, FlowController flowController) { + return !procNode.isIsolated() || !flowController.isConfiguredForClustering() || flowController.isPrimary(); } static boolean isYielded(final ProcessorNode procNode) { @@ -90,7 +90,7 @@ public class ContinuallyRunProcessorTask implements Callable { } // make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node - if (!isRunOnCluster(procNode, flowController.isClustered(), flowController.isPrimary())) { + if (!isRunOnCluster(procNode, flowController)) { return false; }