NIFI-401: - Minor tweaks to PR #1117. - Ensuring existing configuraiton is retained and shown until the user explicits changes it. - Retaining, but disabling, deprecated options.

This closes #1185
This closes #1117
This closes #512

Signed-off-by: jpercivall <JPercivall@apache.org>
This commit is contained in:
Matt Gilman 2016-11-04 11:03:35 -04:00 committed by jpercivall
parent 7eca2037bd
commit bff89f17b3
12 changed files with 131 additions and 131 deletions

View File

@ -16,7 +16,16 @@
*/ */
package org.apache.nifi.scheduling; package org.apache.nifi.scheduling;
/**
* Defines the Nodes where a given Component will be scheduled to run.
*/
public enum ExecutionNode { public enum ExecutionNode {
/**
* A Component will be scheduled to run on all nodes.
*/
ALL, ALL,
/**
* A Component will be scheduled to run on the primary node only.
*/
PRIMARY; PRIMARY;
} }

View File

@ -65,6 +65,7 @@ public enum SchedulingStrategy {
* Scheduling Strategy is used, the component will be scheduled in the same * Scheduling Strategy is used, the component will be scheduled in the same
* manner as if {@link TIMER_DRIVEN} were used. * manner as if {@link TIMER_DRIVEN} were used.
*/ */
@Deprecated
PRIMARY_NODE_ONLY(1, "0 sec"), PRIMARY_NODE_ONLY(1, "0 sec"),
/** /**
* Indicates that the component will be scheduled to run according to a * Indicates that the component will be scheduled to run according to a

View File

@ -529,6 +529,10 @@ The default value of `0 sec` means that the Processor should run as often as pos
for any time duration of 0, regardless of the time unit (i.e., `0 sec`, `0 mins`, `0 days`). For an explanation of values that are for any time duration of 0, regardless of the time unit (i.e., `0 sec`, `0 mins`, `0 days`). For an explanation of values that are
applicable for the CRON driven Scheduling Strategy, see the description of the CRON driven Scheduling Strategy itself. applicable for the CRON driven Scheduling Strategy, see the description of the CRON driven Scheduling Strategy itself.
When configured for clustering, an Execution setting will be available. This setting is used determine which node(s) the Processor will be
scheduled to execute. Selecting 'All Nodes' will result in this Processor being scheduled on every node in the cluster. Selecting
'Primary Node' will result in this Processor being scheduled on the Primary Node only.
The right-hand side of the tab contains a slider for choosing the `Run duration.' This controls how long the Processor should be scheduled The right-hand side of the tab contains a slider for choosing the `Run duration.' This controls how long the Processor should be scheduled
to run each time that it is triggered. On the left-hand side of the slider, it is marked `Lower latency' while the right-hand side to run each time that it is triggered. On the left-hand side of the slider, it is marked `Lower latency' while the right-hand side
is marked `Higher throughput.' When a Processor finishes running, it must update the repository in order to transfer the FlowFiles to is marked `Higher throughput.' When a Processor finishes running, it must update the repository in order to transfer the FlowFiles to

View File

@ -16,35 +16,7 @@
*/ */
package org.apache.nifi.controller; package org.apache.nifi.controller;
import static java.util.Objects.requireNonNull; import com.sun.jersey.api.client.ClientHandlerException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLContext;
import org.apache.commons.collections4.Predicate; import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action; import org.apache.nifi.action.Action;
@ -206,8 +178,8 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.reporting.Severity; import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.ComponentIdGenerator; import org.apache.nifi.util.ComponentIdGenerator;
@ -235,7 +207,33 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.sun.jersey.api.client.ClientHandlerException; import javax.net.ssl.SSLContext;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static java.util.Objects.requireNonNull;
public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider { public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider, Authorizable, ProvenanceAuthorizableFactory, NodeTypeProvider {
@ -482,8 +480,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry); final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry);
final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry, this.nifiProperties); final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry, this.nifiProperties);
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent); processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent);
// PRIMARY_NODE_ONLY is deprecated, but still exists to handle processors that are still defined with it (they haven't been // PRIMARY_NODE_ONLY is deprecated, but still exists to handle processors that are still defined with it (they haven't been re-configured with executeNode = PRIMARY).
// re-configured with executeNode = PRIMARY).
processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent); processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent);
processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent); processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent);
processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS); processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS);

View File

@ -111,7 +111,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
private final Map<Relationship, Set<Connection>> connections; private final Map<Relationship, Set<Connection>> connections;
private final AtomicReference<Set<Relationship>> undefinedRelationshipsToTerminate; private final AtomicReference<Set<Relationship>> undefinedRelationshipsToTerminate;
private final AtomicReference<List<Connection>> incomingConnectionsRef; private final AtomicReference<List<Connection>> incomingConnectionsRef;
private final AtomicBoolean isolated;
private final AtomicBoolean lossTolerant; private final AtomicBoolean lossTolerant;
private final AtomicReference<String> comments; private final AtomicReference<String> comments;
private final AtomicReference<Position> position; private final AtomicReference<Position> position;
@ -172,7 +171,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
style = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<String, String>())); style = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<String, String>()));
this.processGroup = new AtomicReference<>(); this.processGroup = new AtomicReference<>();
processScheduler = scheduler; processScheduler = scheduler;
isolated = new AtomicBoolean(false);
penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD); penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD);
this.nifiProperties = nifiProperties; this.nifiProperties = nifiProperties;
@ -266,7 +264,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override @Override
public boolean isIsolated() { public boolean isIsolated() {
return isolated.get(); return schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY || executionNode == ExecutionNode.PRIMARY;
} }
/** /**
@ -317,19 +315,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
this.lossTolerant.set(lossTolerant); this.lossTolerant.set(lossTolerant);
} }
/**
* Indicates whether the processor runs on only the primary node.
*
* @param isolated
* isolated
*/
public void setIsolated(final boolean isolated) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
this.isolated.set(isolated);
}
@Override @Override
public boolean isAutoTerminated(final Relationship relationship) { public boolean isAutoTerminated(final Relationship relationship) {
if (relationship.isAutoTerminated() && getConnections(relationship).isEmpty()) { if (relationship.isAutoTerminated() && getConnections(relationship).isEmpty()) {
@ -429,8 +414,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
} }
this.schedulingStrategy = schedulingStrategy; this.schedulingStrategy = schedulingStrategy;
// PRIMARY_NODE_ONLY is deprecated. Isolated is also set when executionNode == PRIMARY
setIsolated(schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY);
} }
/** /**
@ -483,7 +466,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override @Override
public void setExecutionNode(final ExecutionNode executionNode) { public void setExecutionNode(final ExecutionNode executionNode) {
this.executionNode = executionNode; this.executionNode = executionNode;
setIsolated(executionNode == ExecutionNode.PRIMARY);
} }
@Override @Override

View File

@ -368,7 +368,7 @@ public class ProcessorAuditor extends NiFiAuditor {
values.put(COMMENTS, processor.getComments()); values.put(COMMENTS, processor.getComments());
} }
if (newConfig.getSchedulingStrategy() != null) { if (newConfig.getSchedulingStrategy() != null) {
values.put(SCHEDULING_STRATEGY, processor.getSchedulingStrategy().toString()); values.put(SCHEDULING_STRATEGY, processor.getSchedulingStrategy().name());
} }
if (newConfig.getExecutionNode() != null) { if (newConfig.getExecutionNode() != null) {
values.put(EXECUTION_NODE, processor.getExecutionNode().name()); values.put(EXECUTION_NODE, processor.getExecutionNode().name());

View File

@ -108,18 +108,6 @@
</div> </div>
<div class="clear"></div> <div class="clear"></div>
</div> </div>
<div class="setting">
<div id="execution-node-container" class="execution-node-setting">
<div class="setting-name">
Execution node
<img class="setting-icon icon-info" src="images/iconInfo.png" alt="Info" title="The node(s) that will run this processor."/>
</div>
<div class="setting-field">
<div id="execution-node-combo"></div>
</div>
</div>
<div class="clear"></div>
</div>
<div id="timer-driven-options" class="setting"> <div id="timer-driven-options" class="setting">
<div class="concurrently-schedulable-tasks-setting"> <div class="concurrently-schedulable-tasks-setting">
<div class="setting-name"> <div class="setting-name">
@ -174,6 +162,18 @@
</div> </div>
<div class="clear"></div> <div class="clear"></div>
</div> </div>
<div id="execution-node-options" class="setting">
<div class="execution-node-setting">
<div class="setting-name">
Execution
<div class="fa fa-question-circle" alt="Info" title="The node(s) that this processor will be scheduled to run on."></div>
</div>
<div class="setting-field">
<div id="execution-node-combo"></div>
</div>
</div>
<div class="clear"></div>
</div>
</div> </div>
<div class="spacer">&nbsp;</div> <div class="spacer">&nbsp;</div>
<div id="run-duration-setting-container" class="settings-right"> <div id="run-duration-setting-container" class="settings-right">

View File

@ -102,18 +102,6 @@
</div> </div>
<div class="clear"></div> <div class="clear"></div>
</div> </div>
<div class="setting">
<div id="details-execution-node-container" class="execution-node-setting">
<div class="setting-name">
Execution node
<img class="setting-icon icon-info" src="images/iconInfo.png" alt="Info" title="The node(s) that will run this processor."/>
</div>
<div class="setting-field">
<span id="read-only-execution-node"></span>
</div>
</div>
<div class="clear"></div>
</div>
<div class="setting"> <div class="setting">
<div class="concurrently-schedulable-tasks-setting"> <div class="concurrently-schedulable-tasks-setting">
<div class="setting-name"> <div class="setting-name">
@ -135,6 +123,18 @@
</div> </div>
<div class="clear"></div> <div class="clear"></div>
</div> </div>
<div id="read-only-execution-node-options" class="setting">
<div class="execution-node-setting">
<div class="setting-name">
Execution
<div class="fa fa-question-circle" alt="Info" title="The node(s) that this processor will be scheduled to run on."></div>
</div>
<div class="setting-field">
<span id="read-only-execution-node"></span>
</div>
</div>
<div class="clear"></div>
</div>
</div> </div>
<div class="spacer">&nbsp;</div> <div class="spacer">&nbsp;</div>
<div class="settings-right"> <div class="settings-right">

View File

@ -55,12 +55,6 @@ div.processor-enabled-container {
width: 20%; width: 20%;
} }
#execution-node-combo {
width: 130px;
height: 18px;
line-height: 18px;
}
#event-driven-warning { #event-driven-warning {
padding-top: 22px; padding-top: 22px;
color: #f00; color: #f00;

View File

@ -25,7 +25,7 @@ nf.ProcessorConfiguration = (function () {
/** /**
* Gets the available scheduling strategies based on the specified processor. * Gets the available scheduling strategies based on the specified processor.
* *
* @param {type} processor * @param {object} processor
* @returns {Array} * @returns {Array}
*/ */
var getSchedulingStrategies = function (processor) { var getSchedulingStrategies = function (processor) {
@ -52,6 +52,16 @@ nf.ProcessorConfiguration = (function () {
}); });
} }
// conditionally support event driven
if (processor.config['schedulingStrategy'] === 'PRIMARY_NODE_ONLY') {
strategies.push({
text: 'On primary node',
value: 'PRIMARY_NODE_ONLY',
description: 'Processor will be scheduled on the primary node on an interval defined by the run schedule. This option has been deprecated, please use the Execution setting below.',
disabled: true
});
}
// add an option for cron driven // add an option for cron driven
strategies.push({ strategies.push({
text: 'CRON driven', text: 'CRON driven',
@ -62,6 +72,25 @@ nf.ProcessorConfiguration = (function () {
return strategies; return strategies;
}; };
/**
* Gets the available execution nodes based on the specified processor.
*
* @param {object} processor
* @returns {Array}
*/
var getExecutionNodeOptions = function (processor) {
return [{
text: 'All nodes',
value: 'ALL',
description: 'Processor will be scheduled to run on all nodes'
}, {
text: 'Primary node',
value: 'PRIMARY',
description: 'Processor will be scheduled to run only on the primary node',
disabled: !nf.Canvas.isClustered() && processor.config['executionNode'] === 'PRIMARY'
}];
};
/** /**
* Handle any expected processor configuration errors. * Handle any expected processor configuration errors.
* *
@ -516,19 +545,6 @@ nf.ProcessorConfiguration = (function () {
}] }]
}); });
// initialize the execution node combo
$('#execution-node-combo').combo({
options: [{
text: 'All nodes',
value: 'ALL',
description: 'Processor will be configured to run on all nodes'
}, {
text: 'Primary node only',
value: 'PRIMARY',
description: 'Processor will be configured to run only on the primary node'
}]
});
// initialize the run duration slider // initialize the run duration slider
$('#run-duration-slider').slider({ $('#run-duration-slider').slider({
min: 0, min: 0,
@ -630,16 +646,7 @@ nf.ProcessorConfiguration = (function () {
value: processor.config['bulletinLevel'] value: processor.config['bulletinLevel']
}); });
// If the scheduling strategy is PRIMARY_NODE_ONLY (deprecated),
// then set the execution node to PRIMARY and the scheduling
// strategy to TIMER. These new values will be saved when/if
// the dialog is applied.
var schedulingStrategy = processor.config['schedulingStrategy']; var schedulingStrategy = processor.config['schedulingStrategy'];
var executionNode = processor.config['executionNode'];
if (schedulingStrategy === 'PRIMARY_NODE_ONLY') {
executionNode = 'PRIMARY';
schedulingStrategy = 'TIMER_DRIVEN';
}
// initialize the scheduling strategy // initialize the scheduling strategy
$('#scheduling-strategy-combo').combo({ $('#scheduling-strategy-combo').combo({
@ -671,14 +678,21 @@ nf.ProcessorConfiguration = (function () {
} }
}); });
// select the execution node var executionNode = processor.config['executionNode'];
$('#execution-node-combo').combo('setSelectedOption', {
// initialize the execution node combo
$('#execution-node-combo').combo({
options: getExecutionNodeOptions(processor),
selectedOption: {
value: executionNode value: executionNode
}
}); });
if (nf.Canvas.isClustered()) {
$('#execution-node-container').show(); // show the execution node option if we're cluster or we're currently configured to run on the primary node only
if (nf.Canvas.isClustered() || executionNode === 'PRIMARY') {
$('#execution-node-options').show();
} else { } else {
$('#execution-node-container').hide(); $('#execution-node-options').hide();
} }
// initialize the concurrentTasks // initialize the concurrentTasks

View File

@ -560,9 +560,9 @@ nf.Common = (function () {
*/ */
populateField: function (target, value) { populateField: function (target, value) {
if (nf.Common.isUndefined(value) || nf.Common.isNull(value)) { if (nf.Common.isUndefined(value) || nf.Common.isNull(value)) {
return $('#' + target).addClass('unset').text('No value previously set'); return $('#' + target).addClass('unset').text('No value set');
} else if (value === '') { } else if (value === '') {
return $('#' + target).addClass('blank').text('Empty string previously set'); return $('#' + target).addClass('blank').text('Empty string set');
} else { } else {
return $('#' + target).text(value); return $('#' + target).text(value);
} }

View File

@ -158,9 +158,9 @@ nf.ProcessorDetails = (function () {
var showRunSchedule = true; var showRunSchedule = true;
// make the scheduling strategy human readable
var schedulingStrategy = details.config['schedulingStrategy']; var schedulingStrategy = details.config['schedulingStrategy'];
var executionNode = details.config['executionNode'];
// make the scheduling strategy human readable
if (schedulingStrategy === 'EVENT_DRIVEN') { if (schedulingStrategy === 'EVENT_DRIVEN') {
showRunSchedule = false; showRunSchedule = false;
schedulingStrategy = 'Event driven'; schedulingStrategy = 'Event driven';
@ -168,12 +168,8 @@ nf.ProcessorDetails = (function () {
schedulingStrategy = 'CRON driven'; schedulingStrategy = 'CRON driven';
} else if (schedulingStrategy === 'TIMER_DRIVEN') { } else if (schedulingStrategy === 'TIMER_DRIVEN') {
schedulingStrategy = "Timer driven"; schedulingStrategy = "Timer driven";
} else if (schedulingStrategy === 'PRIMARY_NODE_ONLY') { } else {
// PRIMARY_NODE_ONLY has been deprecated as a schedulingStrategy = "On primary node";
// schedulingStrategy option, and is now an
// executionNode option instead.
schedulingStrategy = "Timer driven";
executionNode = 'PRIMARY'
} }
nf.Common.populateField('read-only-scheduling-strategy', schedulingStrategy); nf.Common.populateField('read-only-scheduling-strategy', schedulingStrategy);
@ -184,17 +180,20 @@ nf.ProcessorDetails = (function () {
$('#read-only-run-schedule').hide(); $('#read-only-run-schedule').hide();
} }
var executionNode = details.config['executionNode'];
// only show the execution-node when applicable // only show the execution-node when applicable
if (nf.Canvas.isClustered() || executionNode === 'PRIMARY') {
if (executionNode === 'ALL') { if (executionNode === 'ALL') {
executionNode = "All nodes"; executionNode = "All nodes";
} else if (executionNode === 'PRIMARY') { } else if (executionNode === 'PRIMARY') {
executionNode = "Primary node only"; executionNode = "Primary node only";
} }
nf.Common.populateField('read-only-execution-node', executionNode); nf.Common.populateField('read-only-execution-node', executionNode);
if (nf.Canvas.isClustered()) {
$('#details-execution-node-container').show(); $('#read-only-execution-node-options').show();
} else { } else {
$('#details-execution-node-container').hide(); $('#read-only-execution-node-options').hide();
} }
// load the relationship list // load the relationship list