From 7eca2037bd602ecd2b26281d91ac3af9bc72a3f5 Mon Sep 17 00:00:00 2001 From: Brian Eugley Date: Thu, 9 Jun 2016 11:32:15 -0400 Subject: [PATCH] NIFI-401 Signed-off-by: jpercivall --- .../apache/nifi/scheduling/ExecutionNode.java | 22 +++++++ .../nifi/scheduling/SchedulingStrategy.java | 4 ++ .../nifi/web/api/dto/ProcessorConfigDTO.java | 17 ++++++ .../apache/nifi/controller/ProcessorNode.java | 5 ++ .../nifi/controller/FlowController.java | 7 +++ .../controller/StandardFlowSynchronizer.java | 5 ++ .../controller/StandardProcessorNode.java | 15 +++++ .../serialization/FlowFromDOMFactory.java | 9 +++ .../serialization/StandardFlowSerializer.java | 1 + .../nifi/fingerprint/FingerprintFactory.java | 1 + .../src/main/resources/FlowConfiguration.xsd | 11 +++- .../apache/nifi/audit/ProcessorAuditor.java | 4 ++ .../apache/nifi/web/api/dto/DtoFactory.java | 2 + .../nifi/web/controller/ControllerFacade.java | 7 +++ .../web/dao/impl/StandardProcessorDAO.java | 13 ++++ .../canvas/processor-configuration.jsp | 14 ++++- .../WEB-INF/partials/processor-details.jsp | 14 ++++- .../webapp/css/processor-configuration.css | 8 ++- .../src/main/webapp/css/processor-details.css | 4 +- .../nf/canvas/nf-processor-configuration.js | 60 +++++++++++++------ .../main/webapp/js/nf/nf-processor-details.js | 25 +++++++- 21 files changed, 220 insertions(+), 28 deletions(-) create mode 100644 nifi-api/src/main/java/org/apache/nifi/scheduling/ExecutionNode.java diff --git a/nifi-api/src/main/java/org/apache/nifi/scheduling/ExecutionNode.java b/nifi-api/src/main/java/org/apache/nifi/scheduling/ExecutionNode.java new file mode 100644 index 0000000000..f479732239 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/scheduling/ExecutionNode.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.scheduling; + +public enum ExecutionNode { + ALL, + PRIMARY; +} diff --git a/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java b/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java index ccf4281e5e..21bf130a38 100644 --- a/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java +++ b/nifi-api/src/main/java/org/apache/nifi/scheduling/SchedulingStrategy.java @@ -56,6 +56,10 @@ public enum SchedulingStrategy { */ TIMER_DRIVEN(1, "0 sec"), /** + * NOTE: This option has been deprecated with the addition of the + * execution-node combo box. It still exists for backward compatibility + * with existing flows that still have this value for schedulingStrategy. + ** * Indicates that the component will be scheduled via timer only on the * Primary Node. If the instance is not part of a cluster and this * Scheduling Strategy is used, the component will be scheduled in the same diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java index a9006c9bf6..1e4e92f777 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessorConfigDTO.java @@ -34,6 +34,7 @@ public class ProcessorConfigDTO { // settings private String schedulingPeriod; private String schedulingStrategy; + private String executionNode; private String penaltyDuration; private String yieldDuration; private String bulletinLevel; @@ -86,6 +87,22 @@ public class ProcessorConfigDTO { this.schedulingStrategy = schedulingStrategy; } + /** + * Indicates which node the process should run on + * + * @return execution node + */ + @ApiModelProperty( + value = "Indicates the node where the process will execute." + ) + public String getExecutionNode() { + return executionNode; + } + + public void setExecutionNode(String executionNode) { + this.executionNode = executionNode; + } + /** * @return the amount of time that is used when this processor penalizes a flowfile */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java index 08b4abe34c..8f405e04a4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java @@ -35,6 +35,7 @@ import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.scheduling.ExecutionNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +88,10 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen @Override public abstract SchedulingStrategy getSchedulingStrategy(); + public abstract void setExecutionNode(ExecutionNode executionNode); + + public abstract ExecutionNode getExecutionNode(); + public abstract void setRunDuration(long duration, TimeUnit timeUnit); public abstract long getRunDuration(TimeUnit timeUnit); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index ba5ed36488..5f1d34cb82 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -207,6 +207,7 @@ import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.stream.io.LimitingInputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.ComponentIdGenerator; @@ -481,6 +482,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry); final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.variableRegistry, this.nifiProperties); processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent); + // PRIMARY_NODE_ONLY is deprecated, but still exists to handle processors that are still defined with it (they haven't been + // re-configured with executeNode = PRIMARY). processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent); processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent); processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS); @@ -1709,6 +1712,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy())); } + if (config.getExecutionNode() != null) { + procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode())); + } + // ensure that the scheduling strategy is set prior to these values procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount()); procNode.setScheduldingPeriod(config.getSchedulingPeriod()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 8cfb3f3371..1d7ebdea12 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -88,6 +88,7 @@ import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingInitializationContext; import org.apache.nifi.reporting.Severity; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.util.DomUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.file.FileUtils; @@ -902,6 +903,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy())); } + if (config.getExecutionNode() != null) { + procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode())); + } + // must set scheduling strategy before these two procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount()); procNode.setScheduldingPeriod(config.getSchedulingPeriod()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 5ff9f2201d..a76557790a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -56,6 +56,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; @@ -134,6 +135,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private SchedulingStrategy schedulingStrategy; // guarded by read/write lock // ??????? NOT any more + private ExecutionNode executionNode; public StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, @@ -190,6 +192,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; + executionNode = ExecutionNode.ALL; } /** @@ -426,6 +429,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable } this.schedulingStrategy = schedulingStrategy; + // PRIMARY_NODE_ONLY is deprecated. Isolated is also set when executionNode == PRIMARY setIsolated(schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY); } @@ -476,6 +480,17 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable this.schedulingPeriod.set(schedulingPeriod); } + @Override + public void setExecutionNode(final ExecutionNode executionNode) { + this.executionNode = executionNode; + setIsolated(executionNode == ExecutionNode.PRIMARY); + } + + @Override + public ExecutionNode getExecutionNode() { + return this.executionNode; + } + @Override public long getRunDuration(final TimeUnit timeUnit) { return timeUnit.convert(this.runNanos, TimeUnit.NANOSECONDS); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index 2c51e96f27..f1e4232c04 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -32,6 +32,7 @@ import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.util.DomUtils; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; @@ -374,6 +375,14 @@ public class FlowFromDOMFactory { configDto.setSchedulingStrategy(schedulingStrategyName.trim()); } + // handle execution node + final String executionNode = getString(element, "executionNode"); + if (executionNode == null || executionNode.trim().isEmpty()) { + configDto.setExecutionNode(ExecutionNode.ALL.name()); + } else { + configDto.setExecutionNode(executionNode.trim()); + } + final Long runDurationNanos = getOptionalLong(element, "runDurationNanos"); if (runDurationNanos != null) { configDto.setRunDurationMillis(TimeUnit.NANOSECONDS.toMillis(runDurationNanos)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index d04433c7b8..0ead668f23 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -344,6 +344,7 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant())); addTextElement(element, "scheduledState", processor.getScheduledState().name()); addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name()); + addTextElement(element, "executionNode", processor.getExecutionNode().name()); addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS)); addConfiguration(element, processor.getProperties(), processor.getAnnotationData(), encryptor); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index f3a4cbb732..0ab39a2ce3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -519,6 +519,7 @@ public class FingerprintFactory { builder.append(config.getComments()); builder.append(config.getSchedulingPeriod()); builder.append(config.getSchedulingStrategy()); + builder.append(config.getExecutionNode()); builder.append(config.getYieldDuration()); builder.append(config.getConcurrentlySchedulableTaskCount()); builder.append(config.getPenaltyDuration()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index c1e66e59b7..adc83be4cb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -87,6 +87,8 @@ + +