Signed-off-by: jpercivall <JPercivall@apache.org>
This commit is contained in:
Brian Eugley 2016-06-09 11:32:15 -04:00 committed by jpercivall
parent 4b4e099f2e
commit 7eca2037bd
21 changed files with 220 additions and 28 deletions

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.scheduling;
public enum ExecutionNode {
ALL,
PRIMARY;
}

View File

@ -56,6 +56,10 @@ public enum SchedulingStrategy {
*/ */
TIMER_DRIVEN(1, "0 sec"), TIMER_DRIVEN(1, "0 sec"),
/** /**
* NOTE: This option has been deprecated with the addition of the
* execution-node combo box. It still exists for backward compatibility
* with existing flows that still have this value for schedulingStrategy.
**
* Indicates that the component will be scheduled via timer only on the * Indicates that the component will be scheduled via timer only on the
* Primary Node. If the instance is not part of a cluster and this * Primary Node. If the instance is not part of a cluster and this
* Scheduling Strategy is used, the component will be scheduled in the same * Scheduling Strategy is used, the component will be scheduled in the same

View File

@ -34,6 +34,7 @@ public class ProcessorConfigDTO {
// settings // settings
private String schedulingPeriod; private String schedulingPeriod;
private String schedulingStrategy; private String schedulingStrategy;
private String executionNode;
private String penaltyDuration; private String penaltyDuration;
private String yieldDuration; private String yieldDuration;
private String bulletinLevel; private String bulletinLevel;
@ -86,6 +87,22 @@ public class ProcessorConfigDTO {
this.schedulingStrategy = schedulingStrategy; this.schedulingStrategy = schedulingStrategy;
} }
/**
* Indicates which node the process should run on
*
* @return execution node
*/
@ApiModelProperty(
value = "Indicates the node where the process will execute."
)
public String getExecutionNode() {
return executionNode;
}
public void setExecutionNode(String executionNode) {
this.executionNode = executionNode;
}
/** /**
* @return the amount of time that is used when this processor penalizes a flowfile * @return the amount of time that is used when this processor penalizes a flowfile
*/ */

View File

@ -35,6 +35,7 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.scheduling.ExecutionNode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -87,6 +88,10 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
@Override @Override
public abstract SchedulingStrategy getSchedulingStrategy(); public abstract SchedulingStrategy getSchedulingStrategy();
public abstract void setExecutionNode(ExecutionNode executionNode);
public abstract ExecutionNode getExecutionNode();
public abstract void setRunDuration(long duration, TimeUnit timeUnit); public abstract void setRunDuration(long duration, TimeUnit timeUnit);
public abstract long getRunDuration(TimeUnit timeUnit); public abstract long getRunDuration(TimeUnit timeUnit);

View File

@ -207,6 +207,7 @@ import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.reporting.Severity; import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.ComponentIdGenerator; import org.apache.nifi.util.ComponentIdGenerator;
@ -481,6 +482,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry); final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry);
final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry, this.nifiProperties); final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry, this.nifiProperties);
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent); processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent);
// PRIMARY_NODE_ONLY is deprecated, but still exists to handle processors that are still defined with it (they haven't been
// re-configured with executeNode = PRIMARY).
processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent); processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent);
processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent); processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent);
processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS); processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS);
@ -1709,6 +1712,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy())); procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
} }
if (config.getExecutionNode() != null) {
procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
}
// ensure that the scheduling strategy is set prior to these values // ensure that the scheduling strategy is set prior to these values
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount()); procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
procNode.setScheduldingPeriod(config.getSchedulingPeriod()); procNode.setScheduldingPeriod(config.getSchedulingPeriod());

View File

@ -88,6 +88,7 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.Severity; import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.util.DomUtils; import org.apache.nifi.util.DomUtils;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.util.file.FileUtils;
@ -902,6 +903,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy())); procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
} }
if (config.getExecutionNode() != null) {
procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
}
// must set scheduling strategy before these two // must set scheduling strategy before these two
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount()); procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
procNode.setScheduldingPeriod(config.getSchedulingPeriod()); procNode.setScheduldingPeriod(config.getSchedulingPeriod());

View File

@ -56,6 +56,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.util.ReflectionUtils;
@ -134,6 +135,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
private SchedulingStrategy schedulingStrategy; // guarded by read/write lock private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
// ??????? NOT any more // ??????? NOT any more
private ExecutionNode executionNode;
public StandardProcessorNode(final Processor processor, final String uuid, public StandardProcessorNode(final Processor processor, final String uuid,
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
@ -190,6 +192,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
} }
schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN;
executionNode = ExecutionNode.ALL;
} }
/** /**
@ -426,6 +429,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
} }
this.schedulingStrategy = schedulingStrategy; this.schedulingStrategy = schedulingStrategy;
// PRIMARY_NODE_ONLY is deprecated. Isolated is also set when executionNode == PRIMARY
setIsolated(schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY); setIsolated(schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY);
} }
@ -476,6 +480,17 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
this.schedulingPeriod.set(schedulingPeriod); this.schedulingPeriod.set(schedulingPeriod);
} }
@Override
public void setExecutionNode(final ExecutionNode executionNode) {
this.executionNode = executionNode;
setIsolated(executionNode == ExecutionNode.PRIMARY);
}
@Override
public ExecutionNode getExecutionNode() {
return this.executionNode;
}
@Override @Override
public long getRunDuration(final TimeUnit timeUnit) { public long getRunDuration(final TimeUnit timeUnit) {
return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS); return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS);

View File

@ -32,6 +32,7 @@ import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor; import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.util.DomUtils; import org.apache.nifi.util.DomUtils;
import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.ConnectionDTO;
@ -374,6 +375,14 @@ public class FlowFromDOMFactory {
configDto.setSchedulingStrategy(schedulingStrategyName.trim()); configDto.setSchedulingStrategy(schedulingStrategyName.trim());
} }
// handle execution node
final String executionNode = getString(element, "executionNode");
if (executionNode == null || executionNode.trim().isEmpty()) {
configDto.setExecutionNode(ExecutionNode.ALL.name());
} else {
configDto.setExecutionNode(executionNode.trim());
}
final Long runDurationNanos = getOptionalLong(element, "runDurationNanos"); final Long runDurationNanos = getOptionalLong(element, "runDurationNanos");
if (runDurationNanos != null) { if (runDurationNanos != null) {
configDto.setRunDurationMillis(TimeUnit.NANOSECONDS.toMillis(runDurationNanos)); configDto.setRunDurationMillis(TimeUnit.NANOSECONDS.toMillis(runDurationNanos));

View File

@ -344,6 +344,7 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant())); addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant()));
addTextElement(element, "scheduledState", processor.getScheduledState().name()); addTextElement(element, "scheduledState", processor.getScheduledState().name());
addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name()); addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
addTextElement(element, "executionNode", processor.getExecutionNode().name());
addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS)); addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));
addConfiguration(element, processor.getProperties(), processor.getAnnotationData(), encryptor); addConfiguration(element, processor.getProperties(), processor.getAnnotationData(), encryptor);

View File

@ -519,6 +519,7 @@ public class FingerprintFactory {
builder.append(config.getComments()); builder.append(config.getComments());
builder.append(config.getSchedulingPeriod()); builder.append(config.getSchedulingPeriod());
builder.append(config.getSchedulingStrategy()); builder.append(config.getSchedulingStrategy());
builder.append(config.getExecutionNode());
builder.append(config.getYieldDuration()); builder.append(config.getYieldDuration());
builder.append(config.getConcurrentlySchedulableTaskCount()); builder.append(config.getConcurrentlySchedulableTaskCount());
builder.append(config.getPenaltyDuration()); builder.append(config.getPenaltyDuration());

View File

@ -87,6 +87,8 @@
<xs:element name="schedulingStrategy" type="SchedulingStrategy" minOccurs="0" maxOccurs="1" /> <xs:element name="schedulingStrategy" type="SchedulingStrategy" minOccurs="0" maxOccurs="1" />
<xs:element name="executionNode" type="ExecutionNode" minOccurs="0" maxOccurs="1" />
<xs:element name="runDurationNanos" type="xs:long" minOccurs="0" maxOccurs="1" /> <xs:element name="runDurationNanos" type="xs:long" minOccurs="0" maxOccurs="1" />
<!-- properties that must be valid for the processor to execute. <!-- properties that must be valid for the processor to execute.
@ -337,6 +339,13 @@
</xs:restriction> </xs:restriction>
</xs:simpleType> </xs:simpleType>
<xs:simpleType name="ExecutionNode">
<xs:restriction base="xs:string">
<xs:enumeration value="ALL"></xs:enumeration>
<xs:enumeration value="PRIMARY"></xs:enumeration>
</xs:restriction>
</xs:simpleType>
<xs:complexType name="ControllerServicesType"> <xs:complexType name="ControllerServicesType">
<xs:sequence> <xs:sequence>
<xs:element name="controllerService" type="ControllerServiceType" minOccurs="0" maxOccurs="unbounded" /> <xs:element name="controllerService" type="ControllerServiceType" minOccurs="0" maxOccurs="unbounded" />

View File

@ -68,6 +68,7 @@ public class ProcessorAuditor extends NiFiAuditor {
private static final String AUTO_TERMINATED_RELATIONSHIPS = "Auto Terminated Relationships"; private static final String AUTO_TERMINATED_RELATIONSHIPS = "Auto Terminated Relationships";
private static final String SCHEDULING_PERIOD = "Run Schedule"; private static final String SCHEDULING_PERIOD = "Run Schedule";
private static final String SCHEDULING_STRATEGY = "Scheduling Strategy"; private static final String SCHEDULING_STRATEGY = "Scheduling Strategy";
private static final String EXECUTION_NODE = "Execution Node";
/** /**
* Audits the creation of processors via createProcessor(). * Audits the creation of processors via createProcessor().
@ -369,6 +370,9 @@ public class ProcessorAuditor extends NiFiAuditor {
if (newConfig.getSchedulingStrategy() != null) { if (newConfig.getSchedulingStrategy() != null) {
values.put(SCHEDULING_STRATEGY, processor.getSchedulingStrategy().toString()); values.put(SCHEDULING_STRATEGY, processor.getSchedulingStrategy().toString());
} }
if (newConfig.getExecutionNode() != null) {
values.put(EXECUTION_NODE, processor.getExecutionNode().name());
}
} }
return values; return values;

View File

@ -2492,6 +2492,7 @@ public final class DtoFactory {
dto.setComments(procNode.getComments()); dto.setComments(procNode.getComments());
dto.setBulletinLevel(procNode.getBulletinLevel().name()); dto.setBulletinLevel(procNode.getBulletinLevel().name());
dto.setSchedulingStrategy(procNode.getSchedulingStrategy().name()); dto.setSchedulingStrategy(procNode.getSchedulingStrategy().name());
dto.setExecutionNode(procNode.getExecutionNode().name());
dto.setAnnotationData(procNode.getAnnotationData()); dto.setAnnotationData(procNode.getAnnotationData());
// set up the default values for concurrent tasks and scheduling period // set up the default values for concurrent tasks and scheduling period
@ -2670,6 +2671,7 @@ public final class DtoFactory {
copy.setAutoTerminatedRelationships(copy(original.getAutoTerminatedRelationships())); copy.setAutoTerminatedRelationships(copy(original.getAutoTerminatedRelationships()));
copy.setComments(original.getComments()); copy.setComments(original.getComments());
copy.setSchedulingStrategy(original.getSchedulingStrategy()); copy.setSchedulingStrategy(original.getSchedulingStrategy());
copy.setExecutionNode(original.getExecutionNode());
copy.setConcurrentlySchedulableTaskCount(original.getConcurrentlySchedulableTaskCount()); copy.setConcurrentlySchedulableTaskCount(original.getConcurrentlySchedulableTaskCount());
copy.setCustomUiUrl(original.getCustomUiUrl()); copy.setCustomUiUrl(original.getCustomUiUrl());
copy.setDescriptors(copy(original.getDescriptors())); copy.setDescriptors(copy(original.getDescriptors()));

View File

@ -82,6 +82,7 @@ import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.search.SearchContext; import org.apache.nifi.search.SearchContext;
import org.apache.nifi.search.SearchResult; import org.apache.nifi.search.SearchResult;
import org.apache.nifi.search.Searchable; import org.apache.nifi.search.Searchable;
@ -1589,9 +1590,15 @@ public class ControllerFacade implements Authorizable {
} else if (SchedulingStrategy.TIMER_DRIVEN.equals(procNode.getSchedulingStrategy()) && StringUtils.containsIgnoreCase("timer", searchStr)) { } else if (SchedulingStrategy.TIMER_DRIVEN.equals(procNode.getSchedulingStrategy()) && StringUtils.containsIgnoreCase("timer", searchStr)) {
matches.add("Scheduling strategy: Timer driven"); matches.add("Scheduling strategy: Timer driven");
} else if (SchedulingStrategy.PRIMARY_NODE_ONLY.equals(procNode.getSchedulingStrategy()) && StringUtils.containsIgnoreCase("primary", searchStr)) { } else if (SchedulingStrategy.PRIMARY_NODE_ONLY.equals(procNode.getSchedulingStrategy()) && StringUtils.containsIgnoreCase("primary", searchStr)) {
// PRIMARY_NODE_ONLY has been deprecated as a SchedulingStrategy and replaced by PRIMARY as an ExecutionNode.
matches.add("Scheduling strategy: On primary node"); matches.add("Scheduling strategy: On primary node");
} }
// consider execution node
if (ExecutionNode.PRIMARY.equals(procNode.getExecutionNode()) && StringUtils.containsIgnoreCase("primary", searchStr)) {
matches.add("Execution node: primary");
}
// consider scheduled state // consider scheduled state
if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) { if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
if (StringUtils.containsIgnoreCase("disabled", searchStr)) { if (StringUtils.containsIgnoreCase("disabled", searchStr)) {

View File

@ -31,6 +31,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogLevel; import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.NiFiCoreException; import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.ResourceNotFoundException;
@ -117,6 +118,7 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
if (isNotNull(config)) { if (isNotNull(config)) {
// perform the configuration // perform the configuration
final String schedulingStrategy = config.getSchedulingStrategy(); final String schedulingStrategy = config.getSchedulingStrategy();
final String executionNode = config.getExecutionNode();
final String comments = config.getComments(); final String comments = config.getComments();
final String annotationData = config.getAnnotationData(); final String annotationData = config.getAnnotationData();
final Integer maxTasks = config.getConcurrentlySchedulableTaskCount(); final Integer maxTasks = config.getConcurrentlySchedulableTaskCount();
@ -133,6 +135,9 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(schedulingStrategy)); processor.setSchedulingStrategy(SchedulingStrategy.valueOf(schedulingStrategy));
} }
if (isNotNull(executionNode)) {
processor.setExecutionNode(ExecutionNode.valueOf(executionNode));
}
if (isNotNull(comments)) { if (isNotNull(comments)) {
processor.setComments(comments); processor.setComments(comments);
} }
@ -211,6 +216,13 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
validationErrors.add(String.format("Bulletin level: Value must be one of [%s]", StringUtils.join(LogLevel.values(), ", "))); validationErrors.add(String.format("Bulletin level: Value must be one of [%s]", StringUtils.join(LogLevel.values(), ", ")));
} }
} }
if (isNotNull(config.getExecutionNode())) {
try {
ExecutionNode.valueOf(config.getExecutionNode());
} catch (IllegalArgumentException iae) {
validationErrors.add(String.format("Execution node: Value must be one of [%s]", StringUtils.join(ExecutionNode.values(), ", ")));
}
}
// get the current scheduling strategy // get the current scheduling strategy
SchedulingStrategy schedulingStrategy = processorNode.getSchedulingStrategy(); SchedulingStrategy schedulingStrategy = processorNode.getSchedulingStrategy();
@ -345,6 +357,7 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
configDTO.getProperties(), configDTO.getProperties(),
configDTO.getSchedulingPeriod(), configDTO.getSchedulingPeriod(),
configDTO.getSchedulingStrategy(), configDTO.getSchedulingStrategy(),
configDTO.getExecutionNode(),
configDTO.getYieldDuration())) { configDTO.getYieldDuration())) {
modificationRequest = true; modificationRequest = true;

View File

@ -108,6 +108,18 @@
</div> </div>
<div class="clear"></div> <div class="clear"></div>
</div> </div>
<div class="setting">
<div id="execution-node-container" class="execution-node-setting">
<div class="setting-name">
Execution node
<img class="setting-icon icon-info" src="images/iconInfo.png" alt="Info" title="The node(s) that will run this processor."/>
</div>
<div class="setting-field">
<div id="execution-node-combo"></div>
</div>
</div>
<div class="clear"></div>
</div>
<div id="timer-driven-options" class="setting"> <div id="timer-driven-options" class="setting">
<div class="concurrently-schedulable-tasks-setting"> <div class="concurrently-schedulable-tasks-setting">
<div class="setting-name"> <div class="setting-name">

View File

@ -102,6 +102,18 @@
</div> </div>
<div class="clear"></div> <div class="clear"></div>
</div> </div>
<div class="setting">
<div id="details-execution-node-container" class="execution-node-setting">
<div class="setting-name">
Execution node
<img class="setting-icon icon-info" src="images/iconInfo.png" alt="Info" title="The node(s) that will run this processor."/>
</div>
<div class="setting-field">
<span id="read-only-execution-node"></span>
</div>
</div>
<div class="clear"></div>
</div>
<div class="setting"> <div class="setting">
<div class="concurrently-schedulable-tasks-setting"> <div class="concurrently-schedulable-tasks-setting">
<div class="setting-name"> <div class="setting-name">

View File

@ -55,6 +55,12 @@ div.processor-enabled-container {
width: 20%; width: 20%;
} }
#execution-node-combo {
width: 130px;
height: 18px;
line-height: 18px;
}
#event-driven-warning { #event-driven-warning {
padding-top: 22px; padding-top: 22px;
color: #f00; color: #f00;

View File

@ -50,7 +50,7 @@
width: 340px; width: 340px;
} }
div.concurrently-schedulable-tasks-setting, div.scheduling-period-setting, div.penalty-duration-setting, div.yield-duration-setting, div.scheduling-strategy-setting, div.bulletin-setting { div.concurrently-schedulable-tasks-setting, div.scheduling-period-setting, div.penalty-duration-setting, div.yield-duration-setting, div.scheduling-strategy-setting, div.execution-node-setting, div.bulletin-setting {
float: left; float: left;
width: 40%; width: 40%;
} }

View File

@ -52,22 +52,6 @@ nf.ProcessorConfiguration = (function () {
}); });
} }
// conditionally support event driven
if (nf.Canvas.isClustered()) {
strategies.push({
text: 'On primary node',
value: 'PRIMARY_NODE_ONLY',
description: 'Processor will be scheduled on the primary node on an interval defined by the run schedule.'
});
} else if (processor.config['schedulingStrategy'] === 'PRIMARY_NODE_ONLY') {
strategies.push({
text: 'On primary node',
value: 'PRIMARY_NODE_ONLY',
description: 'Processor will be scheduled on the primary node on an interval defined by the run schedule.',
disabled: true
});
}
// add an option for cron driven // add an option for cron driven
strategies.push({ strategies.push({
text: 'CRON driven', text: 'CRON driven',
@ -200,6 +184,9 @@ nf.ProcessorConfiguration = (function () {
return true; return true;
} }
if ($('#execution-node-combo').combo('getSelectedOption').value !== (details.config['executionNode'] + '')) {
return true;
}
if ($('#processor-name').val() !== details['name']) { if ($('#processor-name').val() !== details['name']) {
return true; return true;
} }
@ -263,6 +250,7 @@ nf.ProcessorConfiguration = (function () {
processorConfigDto['schedulingPeriod'] = schedulingPeriod.val(); processorConfigDto['schedulingPeriod'] = schedulingPeriod.val();
} }
processorConfigDto['executionNode'] = $('#execution-node-combo').combo('getSelectedOption').value;
processorConfigDto['penaltyDuration'] = $('#penalty-duration').val(); processorConfigDto['penaltyDuration'] = $('#penalty-duration').val();
processorConfigDto['yieldDuration'] = $('#yield-duration').val(); processorConfigDto['yieldDuration'] = $('#yield-duration').val();
processorConfigDto['bulletinLevel'] = $('#bulletin-level-combo').combo('getSelectedOption').value; processorConfigDto['bulletinLevel'] = $('#bulletin-level-combo').combo('getSelectedOption').value;
@ -528,6 +516,19 @@ nf.ProcessorConfiguration = (function () {
}] }]
}); });
// initialize the execution node combo
$('#execution-node-combo').combo({
options: [{
text: 'All nodes',
value: 'ALL',
description: 'Processor will be configured to run on all nodes'
}, {
text: 'Primary node only',
value: 'PRIMARY',
description: 'Processor will be configured to run only on the primary node'
}]
});
// initialize the run duration slider // initialize the run duration slider
$('#run-duration-slider').slider({ $('#run-duration-slider').slider({
min: 0, min: 0,
@ -629,11 +630,22 @@ nf.ProcessorConfiguration = (function () {
value: processor.config['bulletinLevel'] value: processor.config['bulletinLevel']
}); });
// If the scheduling strategy is PRIMARY_NODE_ONLY (deprecated),
// then set the execution node to PRIMARY and the scheduling
// strategy to TIMER. These new values will be saved when/if
// the dialog is applied.
var schedulingStrategy = processor.config['schedulingStrategy'];
var executionNode = processor.config['executionNode'];
if (schedulingStrategy === 'PRIMARY_NODE_ONLY') {
executionNode = 'PRIMARY';
schedulingStrategy = 'TIMER_DRIVEN';
}
// initialize the scheduling strategy // initialize the scheduling strategy
$('#scheduling-strategy-combo').combo({ $('#scheduling-strategy-combo').combo({
options: getSchedulingStrategies(processor), options: getSchedulingStrategies(processor),
selectedOption: { selectedOption: {
value: processor.config['schedulingStrategy'] value: schedulingStrategy
}, },
select: function (selectedOption) { select: function (selectedOption) {
// show the appropriate panel // show the appropriate panel
@ -659,6 +671,16 @@ nf.ProcessorConfiguration = (function () {
} }
}); });
// select the execution node
$('#execution-node-combo').combo('setSelectedOption', {
value: executionNode
});
if (nf.Canvas.isClustered()) {
$('#execution-node-container').show();
} else {
$('#execution-node-container').hide();
}
// initialize the concurrentTasks // initialize the concurrentTasks
var defaultConcurrentTasks = processor.config['defaultConcurrentTasks']; var defaultConcurrentTasks = processor.config['defaultConcurrentTasks'];
$('#timer-driven-concurrently-schedulable-tasks').val(defaultConcurrentTasks['TIMER_DRIVEN']); $('#timer-driven-concurrently-schedulable-tasks').val(defaultConcurrentTasks['TIMER_DRIVEN']);
@ -667,9 +689,9 @@ nf.ProcessorConfiguration = (function () {
// get the appropriate concurrent tasks field // get the appropriate concurrent tasks field
var concurrentTasks; var concurrentTasks;
if (processor.config['schedulingStrategy'] === 'EVENT_DRIVEN') { if (schedulingStrategy === 'EVENT_DRIVEN') {
concurrentTasks = $('#event-driven-concurrently-schedulable-tasks').val(processor.config['concurrentlySchedulableTaskCount']); concurrentTasks = $('#event-driven-concurrently-schedulable-tasks').val(processor.config['concurrentlySchedulableTaskCount']);
} else if (processor.config['schedulingStrategy'] === 'CRON_DRIVEN') { } else if (schedulingStrategy === 'CRON_DRIVEN') {
concurrentTasks = $('#cron-driven-concurrently-schedulable-tasks').val(processor.config['concurrentlySchedulableTaskCount']); concurrentTasks = $('#cron-driven-concurrently-schedulable-tasks').val(processor.config['concurrentlySchedulableTaskCount']);
} else { } else {
concurrentTasks = $('#timer-driven-concurrently-schedulable-tasks').val(processor.config['concurrentlySchedulableTaskCount']); concurrentTasks = $('#timer-driven-concurrently-schedulable-tasks').val(processor.config['concurrentlySchedulableTaskCount']);

View File

@ -104,6 +104,7 @@ nf.ProcessorDetails = (function () {
nf.Common.clearField('read-only-yield-duration'); nf.Common.clearField('read-only-yield-duration');
nf.Common.clearField('read-only-run-duration'); nf.Common.clearField('read-only-run-duration');
nf.Common.clearField('read-only-bulletin-level'); nf.Common.clearField('read-only-bulletin-level');
nf.Common.clearField('read-only-execution-node');
nf.Common.clearField('read-only-execution-status'); nf.Common.clearField('read-only-execution-status');
nf.Common.clearField('read-only-processor-comments'); nf.Common.clearField('read-only-processor-comments');
@ -159,6 +160,7 @@ nf.ProcessorDetails = (function () {
// make the scheduling strategy human readable // make the scheduling strategy human readable
var schedulingStrategy = details.config['schedulingStrategy']; var schedulingStrategy = details.config['schedulingStrategy'];
var executionNode = details.config['executionNode'];
if (schedulingStrategy === 'EVENT_DRIVEN') { if (schedulingStrategy === 'EVENT_DRIVEN') {
showRunSchedule = false; showRunSchedule = false;
schedulingStrategy = 'Event driven'; schedulingStrategy = 'Event driven';
@ -166,8 +168,12 @@ nf.ProcessorDetails = (function () {
schedulingStrategy = 'CRON driven'; schedulingStrategy = 'CRON driven';
} else if (schedulingStrategy === 'TIMER_DRIVEN') { } else if (schedulingStrategy === 'TIMER_DRIVEN') {
schedulingStrategy = "Timer driven"; schedulingStrategy = "Timer driven";
} else { } else if (schedulingStrategy === 'PRIMARY_NODE_ONLY') {
schedulingStrategy = "On primary node"; // PRIMARY_NODE_ONLY has been deprecated as a
// schedulingStrategy option, and is now an
// executionNode option instead.
schedulingStrategy = "Timer driven";
executionNode = 'PRIMARY'
} }
nf.Common.populateField('read-only-scheduling-strategy', schedulingStrategy); nf.Common.populateField('read-only-scheduling-strategy', schedulingStrategy);
@ -178,6 +184,19 @@ nf.ProcessorDetails = (function () {
$('#read-only-run-schedule').hide(); $('#read-only-run-schedule').hide();
} }
// only show the execution-node when applicable
if (executionNode === 'ALL') {
executionNode = "All nodes";
} else if (executionNode === 'PRIMARY') {
executionNode = "Primary node only";
}
nf.Common.populateField('read-only-execution-node', executionNode);
if (nf.Canvas.isClustered()) {
$('#details-execution-node-container').show();
} else {
$('#details-execution-node-container').hide();
}
// load the relationship list // load the relationship list
if (!nf.Common.isEmpty(details.relationships)) { if (!nf.Common.isEmpty(details.relationships)) {
$.each(details.relationships, function (i, relationship) { $.each(details.relationships, function (i, relationship) {