diff --git a/nifi-api/src/main/java/org/apache/nifi/scheduling/ExecutionNode.java b/nifi-api/src/main/java/org/apache/nifi/scheduling/ExecutionNode.java index f479732239..e41f7038c8 100644 --- a/nifi-api/src/main/java/org/apache/nifi/scheduling/ExecutionNode.java +++ b/nifi-api/src/main/java/org/apache/nifi/scheduling/ExecutionNode.java @@ -16,7 +16,16 @@ */ package org.apache.nifi.scheduling; +/** + * Defines the Nodes where a given Component will be scheduled to run. + */ public enum ExecutionNode { + /** + * A Component will be scheduled to run on all nodes. + */ ALL, + /** + * A Component will be scheduled to run on the primary node only. + */ PRIMARY; } diff --git a/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java b/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java index 21bf130a38..332cc45cd4 100644 --- a/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java +++ b/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java @@ -65,6 +65,7 @@ public enum SchedulingStrategy { * Scheduling Strategy is used, the component will be scheduled in the same * manner as if {@link TIMER_DRIVEN} were used. */ + @Deprecated PRIMARY_NODE_ONLY(1, "0 sec"), /** * Indicates that the component will be scheduled to run according to a diff --git a/nifi-docs/src/main/asciidoc/user-guide.adoc b/nifi-docs/src/main/asciidoc/user-guide.adoc index e5f71a5b35..844793ce35 100644 --- a/nifi-docs/src/main/asciidoc/user-guide.adoc +++ b/nifi-docs/src/main/asciidoc/user-guide.adoc @@ -529,6 +529,10 @@ The default value of `0 sec` means that the Processor should run as often as pos for any time duration of 0, regardless of the time unit (i.e., `0 sec`, `0 mins`, `0 days`). For an explanation of values that are applicable for the CRON driven Scheduling Strategy, see the description of the CRON driven Scheduling Strategy itself. +When configured for clustering, an Execution setting will be available. This setting is used determine which node(s) the Processor will be +scheduled to execute. Selecting 'All Nodes' will result in this Processor being scheduled on every node in the cluster. Selecting +'Primary Node' will result in this Processor being scheduled on the Primary Node only. + The right-hand side of the tab contains a slider for choosing the `Run duration.' This controls how long the Processor should be scheduled to run each time that it is triggered. On the left-hand side of the slider, it is marked `Lower latency' while the right-hand side is marked `Higher throughput.' When a Processor finishes running, it must update the repository in order to transfer the FlowFiles to diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 5f1d34cb82..ec8b288a4f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -16,35 +16,7 @@ */ package org.apache.nifi.controller; -import static java.util.Objects.requireNonNull; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import javax.net.ssl.SSLContext; - +import com.sun.jersey.api.client.ClientHandlerException; import org.apache.commons.collections4.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -206,8 +178,8 @@ import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.reporting.Severity; -import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.ExecutionNode; +import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.ComponentIdGenerator; @@ -235,7 +207,33 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.sun.jersey.api.client.ClientHandlerException; +import javax.net.ssl.SSLContext; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static java.util.Objects.requireNonNull; public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider { @@ -482,8 +480,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry); final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry, this.nifiProperties); processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent); - // PRIMARY_NODE_ONLY is deprecated, but still exists to handle processors that are still defined with it (they haven't been - // re-configured with executeNode = PRIMARY). + // PRIMARY_NODE_ONLY is deprecated, but still exists to handle processors that are still defined with it (they haven't been re-configured with executeNode = PRIMARY). processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent); processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent); processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index a76557790a..27fdc0f705 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -111,7 +111,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private final Map> connections; private final AtomicReference> undefinedRelationshipsToTerminate; private final AtomicReference> incomingConnectionsRef; - private final AtomicBoolean isolated; private final AtomicBoolean lossTolerant; private final AtomicReference comments; private final AtomicReference position; @@ -172,7 +171,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable style = new AtomicReference<>(Collections.unmodifiableMap(new HashMap())); this.processGroup = new AtomicReference<>(); processScheduler = scheduler; - isolated = new AtomicBoolean(false); penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD); this.nifiProperties = nifiProperties; @@ -266,7 +264,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public boolean isIsolated() { - return isolated.get(); + return schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY || executionNode == ExecutionNode.PRIMARY; } /** @@ -317,19 +315,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable this.lossTolerant.set(lossTolerant); } - /** - * Indicates whether the processor runs on only the primary node. - * - * @param isolated - * isolated - */ - public void setIsolated(final boolean isolated) { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } - this.isolated.set(isolated); - } - @Override public boolean isAutoTerminated(final Relationship relationship) { if (relationship.isAutoTerminated() && getConnections(relationship).isEmpty()) { @@ -429,8 +414,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } this.schedulingStrategy = schedulingStrategy; - // PRIMARY_NODE_ONLY is deprecated. Isolated is also set when executionNode == PRIMARY - setIsolated(schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY); } /** @@ -483,7 +466,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public void setExecutionNode(final ExecutionNode executionNode) { this.executionNode = executionNode; - setIsolated(executionNode == ExecutionNode.PRIMARY); } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java index d24ab859bf..915ee79367 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessorAuditor.java @@ -368,7 +368,7 @@ public class ProcessorAuditor extends NiFiAuditor { values.put(COMMENTS, processor.getComments()); } if (newConfig.getSchedulingStrategy() != null) { - values.put(SCHEDULING_STRATEGY, processor.getSchedulingStrategy().toString()); + values.put(SCHEDULING_STRATEGY, processor.getSchedulingStrategy().name()); } if (newConfig.getExecutionNode() != null) { values.put(EXECUTION_NODE, processor.getExecutionNode().name()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp index 438d591ea7..829be1ece8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/processor-configuration.jsp @@ -108,18 +108,6 @@
-
-
-
- Execution node - Info -
-
-
-
-
-
-
@@ -174,6 +162,18 @@
+
+
+
+ Execution +
+
+
+
+
+
+
+
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/processor-details.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/processor-details.jsp index 3210a404b3..15d8cc3253 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/processor-details.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/processor-details.jsp @@ -102,18 +102,6 @@
-
-
-
- Execution node - Info -
-
- -
-
-
-
@@ -135,6 +123,18 @@
+
+
+
+ Execution +
+
+
+ +
+
+
+
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/processor-configuration.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/processor-configuration.css index c8b5843924..6e82b5e2a7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/processor-configuration.css +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/processor-configuration.css @@ -55,12 +55,6 @@ div.processor-enabled-container { width: 20%; } -#execution-node-combo { - width: 130px; - height: 18px; - line-height: 18px; -} - #event-driven-warning { padding-top: 22px; color: #f00; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js index eb0df92967..a9a51f5cfa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-processor-configuration.js @@ -25,7 +25,7 @@ nf.ProcessorConfiguration = (function () { /** * Gets the available scheduling strategies based on the specified processor. * - * @param {type} processor + * @param {object} processor * @returns {Array} */ var getSchedulingStrategies = function (processor) { @@ -52,6 +52,16 @@ nf.ProcessorConfiguration = (function () { }); } + // conditionally support event driven + if (processor.config['schedulingStrategy'] === 'PRIMARY_NODE_ONLY') { + strategies.push({ + text: 'On primary node', + value: 'PRIMARY_NODE_ONLY', + description: 'Processor will be scheduled on the primary node on an interval defined by the run schedule. This option has been deprecated, please use the Execution setting below.', + disabled: true + }); + } + // add an option for cron driven strategies.push({ text: 'CRON driven', @@ -62,6 +72,25 @@ nf.ProcessorConfiguration = (function () { return strategies; }; + /** + * Gets the available execution nodes based on the specified processor. + * + * @param {object} processor + * @returns {Array} + */ + var getExecutionNodeOptions = function (processor) { + return [{ + text: 'All nodes', + value: 'ALL', + description: 'Processor will be scheduled to run on all nodes' + }, { + text: 'Primary node', + value: 'PRIMARY', + description: 'Processor will be scheduled to run only on the primary node', + disabled: !nf.Canvas.isClustered() && processor.config['executionNode'] === 'PRIMARY' + }]; + }; + /** * Handle any expected processor configuration errors. * @@ -516,19 +545,6 @@ nf.ProcessorConfiguration = (function () { }] }); - // initialize the execution node combo - $('#execution-node-combo').combo({ - options: [{ - text: 'All nodes', - value: 'ALL', - description: 'Processor will be configured to run on all nodes' - }, { - text: 'Primary node only', - value: 'PRIMARY', - description: 'Processor will be configured to run only on the primary node' - }] - }); - // initialize the run duration slider $('#run-duration-slider').slider({ min: 0, @@ -630,16 +646,7 @@ nf.ProcessorConfiguration = (function () { value: processor.config['bulletinLevel'] }); - // If the scheduling strategy is PRIMARY_NODE_ONLY (deprecated), - // then set the execution node to PRIMARY and the scheduling - // strategy to TIMER. These new values will be saved when/if - // the dialog is applied. var schedulingStrategy = processor.config['schedulingStrategy']; - var executionNode = processor.config['executionNode']; - if (schedulingStrategy === 'PRIMARY_NODE_ONLY') { - executionNode = 'PRIMARY'; - schedulingStrategy = 'TIMER_DRIVEN'; - } // initialize the scheduling strategy $('#scheduling-strategy-combo').combo({ @@ -671,14 +678,21 @@ nf.ProcessorConfiguration = (function () { } }); - // select the execution node - $('#execution-node-combo').combo('setSelectedOption', { - value: executionNode + var executionNode = processor.config['executionNode']; + + // initialize the execution node combo + $('#execution-node-combo').combo({ + options: getExecutionNodeOptions(processor), + selectedOption: { + value: executionNode + } }); - if (nf.Canvas.isClustered()) { - $('#execution-node-container').show(); + + // show the execution node option if we're cluster or we're currently configured to run on the primary node only + if (nf.Canvas.isClustered() || executionNode === 'PRIMARY') { + $('#execution-node-options').show(); } else { - $('#execution-node-container').hide(); + $('#execution-node-options').hide(); } // initialize the concurrentTasks diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js index be9fae792f..9dcf7d3677 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-common.js @@ -560,9 +560,9 @@ nf.Common = (function () { */ populateField: function (target, value) { if (nf.Common.isUndefined(value) || nf.Common.isNull(value)) { - return $('#' + target).addClass('unset').text('No value previously set'); + return $('#' + target).addClass('unset').text('No value set'); } else if (value === '') { - return $('#' + target).addClass('blank').text('Empty string previously set'); + return $('#' + target).addClass('blank').text('Empty string set'); } else { return $('#' + target).text(value); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js index f94ece08ed..e5dd2a6cf5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/nf-processor-details.js @@ -158,9 +158,9 @@ nf.ProcessorDetails = (function () { var showRunSchedule = true; - // make the scheduling strategy human readable var schedulingStrategy = details.config['schedulingStrategy']; - var executionNode = details.config['executionNode']; + + // make the scheduling strategy human readable if (schedulingStrategy === 'EVENT_DRIVEN') { showRunSchedule = false; schedulingStrategy = 'Event driven'; @@ -168,12 +168,8 @@ nf.ProcessorDetails = (function () { schedulingStrategy = 'CRON driven'; } else if (schedulingStrategy === 'TIMER_DRIVEN') { schedulingStrategy = "Timer driven"; - } else if (schedulingStrategy === 'PRIMARY_NODE_ONLY') { - // PRIMARY_NODE_ONLY has been deprecated as a - // schedulingStrategy option, and is now an - // executionNode option instead. - schedulingStrategy = "Timer driven"; - executionNode = 'PRIMARY' + } else { + schedulingStrategy = "On primary node"; } nf.Common.populateField('read-only-scheduling-strategy', schedulingStrategy); @@ -184,17 +180,20 @@ nf.ProcessorDetails = (function () { $('#read-only-run-schedule').hide(); } + var executionNode = details.config['executionNode']; + // only show the execution-node when applicable - if (executionNode === 'ALL') { - executionNode = "All nodes"; - } else if (executionNode === 'PRIMARY') { - executionNode = "Primary node only"; - } - nf.Common.populateField('read-only-execution-node', executionNode); - if (nf.Canvas.isClustered()) { - $('#details-execution-node-container').show(); + if (nf.Canvas.isClustered() || executionNode === 'PRIMARY') { + if (executionNode === 'ALL') { + executionNode = "All nodes"; + } else if (executionNode === 'PRIMARY') { + executionNode = "Primary node only"; + } + nf.Common.populateField('read-only-execution-node', executionNode); + + $('#read-only-execution-node-options').show(); } else { - $('#details-execution-node-container').hide(); + $('#read-only-execution-node-options').hide(); } // load the relationship list