mirror of https://github.com/apache/nifi.git
NIFI-2305: Do not run processors that are marked as Primary Node Only if disconnected from cluster
This closes #667 Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
parent
f4d2919955
commit
41c0f19e48
|
@ -3044,7 +3044,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
* @throws IllegalStateException if not configured for clustering
|
* @throws IllegalStateException if not configured for clustering
|
||||||
*/
|
*/
|
||||||
public void startHeartbeating() throws IllegalStateException {
|
public void startHeartbeating() throws IllegalStateException {
|
||||||
if (!configuredForClustering) {
|
if (!isConfiguredForClustering()) {
|
||||||
throw new IllegalStateException("Unable to start heartbeating because heartbeating is not configured.");
|
throw new IllegalStateException("Unable to start heartbeating because heartbeating is not configured.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3082,7 +3082,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
* @throws IllegalStateException if not clustered
|
* @throws IllegalStateException if not clustered
|
||||||
*/
|
*/
|
||||||
public void stopHeartbeating() throws IllegalStateException {
|
public void stopHeartbeating() throws IllegalStateException {
|
||||||
if (!configuredForClustering) {
|
if (!isConfiguredForClustering()) {
|
||||||
throw new IllegalStateException("Unable to stop heartbeating because heartbeating is not configured.");
|
throw new IllegalStateException("Unable to stop heartbeating because heartbeating is not configured.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3156,6 +3156,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isConfiguredForClustering() {
|
||||||
|
return configuredForClustering;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the DN of the Cluster Manager that we are currently connected to, if available. This will return null if the instance is not clustered or if the instance is clustered but the NCM's DN
|
* @return the DN of the Cluster Manager that we are currently connected to, if available. This will return null if the instance is not clustered or if the instance is clustered but the NCM's DN
|
||||||
* is not available - for instance, if cluster communications are not secure
|
* is not available - for instance, if cluster communications are not secure
|
||||||
|
@ -3324,7 +3328,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean isPrimary() {
|
public boolean isPrimary() {
|
||||||
return leaderElectionManager != null && leaderElectionManager.isLeader(ClusterRoles.PRIMARY_NODE);
|
return isClustered() && leaderElectionManager != null && leaderElectionManager.isLeader(ClusterRoles.PRIMARY_NODE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setPrimary(final boolean primary) {
|
public void setPrimary(final boolean primary) {
|
||||||
|
@ -3914,6 +3918,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, groupId);
|
return controllerServiceProvider.getControllerServiceIdentifiers(serviceType, groupId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public ProvenanceRepository getProvenanceRepository() {
|
public ProvenanceRepository getProvenanceRepository() {
|
||||||
return provenanceRepository;
|
return provenanceRepository;
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,8 +70,8 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
|
||||||
this.processContext = processContext;
|
this.processContext = processContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
static boolean isRunOnCluster(final ProcessorNode procNode, final boolean isClustered, final boolean isPrimary) {
|
static boolean isRunOnCluster(final ProcessorNode procNode, FlowController flowController) {
|
||||||
return !procNode.isIsolated() || !isClustered || isPrimary;
|
return !procNode.isIsolated() || !flowController.isConfiguredForClustering() || flowController.isPrimary();
|
||||||
}
|
}
|
||||||
|
|
||||||
static boolean isYielded(final ProcessorNode procNode) {
|
static boolean isYielded(final ProcessorNode procNode) {
|
||||||
|
@ -90,7 +90,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node
|
// make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node
|
||||||
if (!isRunOnCluster(procNode, flowController.isClustered(), flowController.isPrimary())) {
|
if (!isRunOnCluster(procNode, flowController)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue