diff --git a/nifi-api/src/main/java/org/apache/nifi/scheduling/ExecutionNode.java b/nifi-api/src/main/java/org/apache/nifi/scheduling/ExecutionNode.java
new file mode 100644
index 0000000000..f479732239
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/scheduling/ExecutionNode.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.scheduling;
+
+public enum ExecutionNode {
+ ALL,
+ PRIMARY;
+}
diff --git a/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java b/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java
index ccf4281e5e..21bf130a38 100644
--- a/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java
+++ b/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java
@@ -56,6 +56,10 @@ public enum SchedulingStrategy {
*/
TIMER_DRIVEN(1, "0 sec"),
/**
+ * NOTE: This option has been deprecated with the addition of the
+ * execution-node combo box. It still exists for backward compatibility
+ * with existing flows that still have this value for schedulingStrategy.
+ **
* Indicates that the component will be scheduled via timer only on the
* Primary Node. If the instance is not part of a cluster and this
* Scheduling Strategy is used, the component will be scheduled in the same
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java
index a9006c9bf6..1e4e92f777 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java
@@ -34,6 +34,7 @@ public class ProcessorConfigDTO {
// settings
private String schedulingPeriod;
private String schedulingStrategy;
+ private String executionNode;
private String penaltyDuration;
private String yieldDuration;
private String bulletinLevel;
@@ -86,6 +87,22 @@ public class ProcessorConfigDTO {
this.schedulingStrategy = schedulingStrategy;
}
+ /**
+ * Indicates which node the process should run on
+ *
+ * @return execution node
+ */
+ @ApiModelProperty(
+ value = "Indicates the node where the process will execute."
+ )
+ public String getExecutionNode() {
+ return executionNode;
+ }
+
+ public void setExecutionNode(String executionNode) {
+ this.executionNode = executionNode;
+ }
+
/**
* @return the amount of time that is used when this processor penalizes a flowfile
*/
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 08b4abe34c..8f405e04a4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -35,6 +35,7 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.scheduling.ExecutionNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,6 +88,10 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
@Override
public abstract SchedulingStrategy getSchedulingStrategy();
+ public abstract void setExecutionNode(ExecutionNode executionNode);
+
+ public abstract ExecutionNode getExecutionNode();
+
public abstract void setRunDuration(long duration, TimeUnit timeUnit);
public abstract long getRunDuration(TimeUnit timeUnit);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index ba5ed36488..5f1d34cb82 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -207,6 +207,7 @@ import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.ComponentIdGenerator;
@@ -481,6 +482,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry);
final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry, this.nifiProperties);
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent);
+ // PRIMARY_NODE_ONLY is deprecated, but still exists to handle processors that are still defined with it (they haven't been
+ // re-configured with executeNode = PRIMARY).
processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent);
processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent);
processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS);
@@ -1709,6 +1712,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
}
+ if (config.getExecutionNode() != null) {
+ procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
+ }
+
// ensure that the scheduling strategy is set prior to these values
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
procNode.setScheduldingPeriod(config.getSchedulingPeriod());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 8cfb3f3371..1d7ebdea12 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -88,6 +88,7 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.util.DomUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.file.FileUtils;
@@ -902,6 +903,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
}
+ if (config.getExecutionNode() != null) {
+ procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
+ }
+
// must set scheduling strategy before these two
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
procNode.setScheduldingPeriod(config.getSchedulingPeriod());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 5ff9f2201d..a76557790a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -56,6 +56,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
@@ -134,6 +135,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
// ??????? NOT any more
+ private ExecutionNode executionNode;
public StandardProcessorNode(final Processor processor, final String uuid,
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
@@ -190,6 +192,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
}
schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN;
+ executionNode = ExecutionNode.ALL;
}
/**
@@ -426,6 +429,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
}
this.schedulingStrategy = schedulingStrategy;
+ // PRIMARY_NODE_ONLY is deprecated. Isolated is also set when executionNode == PRIMARY
setIsolated(schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY);
}
@@ -476,6 +480,17 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
this.schedulingPeriod.set(schedulingPeriod);
}
+ @Override
+ public void setExecutionNode(final ExecutionNode executionNode) {
+ this.executionNode = executionNode;
+ setIsolated(executionNode == ExecutionNode.PRIMARY);
+ }
+
+ @Override
+ public ExecutionNode getExecutionNode() {
+ return this.executionNode;
+ }
+
@Override
public long getRunDuration(final TimeUnit timeUnit) {
return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
index 2c51e96f27..f1e4232c04 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
@@ -32,6 +32,7 @@ import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.util.DomUtils;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
@@ -374,6 +375,14 @@ public class FlowFromDOMFactory {
configDto.setSchedulingStrategy(schedulingStrategyName.trim());
}
+ // handle execution node
+ final String executionNode = getString(element, "executionNode");
+ if (executionNode == null || executionNode.trim().isEmpty()) {
+ configDto.setExecutionNode(ExecutionNode.ALL.name());
+ } else {
+ configDto.setExecutionNode(executionNode.trim());
+ }
+
final Long runDurationNanos = getOptionalLong(element, "runDurationNanos");
if (runDurationNanos != null) {
configDto.setRunDurationMillis(TimeUnit.NANOSECONDS.toMillis(runDurationNanos));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index d04433c7b8..0ead668f23 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -344,6 +344,7 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant()));
addTextElement(element, "scheduledState", processor.getScheduledState().name());
addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
+ addTextElement(element, "executionNode", processor.getExecutionNode().name());
addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));
addConfiguration(element, processor.getProperties(), processor.getAnnotationData(), encryptor);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
index f3a4cbb732..0ab39a2ce3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
@@ -519,6 +519,7 @@ public class FingerprintFactory {
builder.append(config.getComments());
builder.append(config.getSchedulingPeriod());
builder.append(config.getSchedulingStrategy());
+ builder.append(config.getExecutionNode());
builder.append(config.getYieldDuration());
builder.append(config.getConcurrentlySchedulableTaskCount());
builder.append(config.getPenaltyDuration());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
index c1e66e59b7..adc83be4cb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
@@ -87,6 +87,8 @@
+
+