From 8e1c79eaafe886e85e4ceaf8436b961eccfef568 Mon Sep 17 00:00:00 2001 From: Jeff Storck Date: Mon, 15 May 2017 17:21:11 -0400 Subject: [PATCH] NIFI-1963 Allows a node reconnecting to the cluster to inherit non-fingerprinted processor settings Forces a node reconnecting to a cluster to serialize the updated flow to disk Added most processor settings to the flow fingerprint (excluding name, style, comment, position, and schedule state) Updated some test data for FingerprintFactoryTest to test for new fields added to the flow fingerprint Updated StandardProcessorNode to allow processor comments and name to be settable while a processor is running Updated StandardFlowSynchronizer to inherit non-fingerprinted processor settings (name, style, comment, and position) when flow is already synchronized This closes #1812 --- .../nifi/controller/StandardFlowService.java | 4 ++- .../controller/StandardFlowSynchronizer.java | 14 +++++++--- .../controller/StandardProcessorNode.java | 6 ----- .../nifi/fingerprint/FingerprintFactory.java | 19 +++++++++++++ .../resources/nifi/fingerprint/flow1a.xml | 17 +++++++++--- .../resources/nifi/fingerprint/flow1b.xml | 27 ++++++++++++++----- .../test/resources/nifi/fingerprint/flow2.xml | 17 +++++++++--- 7 files changed, 80 insertions(+), 24 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 0ce6742862..b2c16281a7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -643,7 +643,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler { clusterCoordinator.resetNodeStatuses(connectionResponse.getNodeConnectionStatuses().stream() .collect(Collectors.toMap(status -> status.getNodeIdentifier(), status -> status))); - controller.resumeHeartbeats(); // we are now connected, so resume sending heartbeats. + // reconnected, this node needs to explicitly write the inherited flow to disk, and resume heartbeats + saveFlowChanges(); + controller.resumeHeartbeats(); logger.info("Node reconnected."); } catch (final Exception ex) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 09338c95d9..975f95425d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -725,6 +725,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final ProcessorDTO dto = FlowFromDOMFactory.getProcessor(processorElement, encryptor); final ProcessorNode procNode = processGroup.getProcessor(dto.getId()); + updateNonFingerprintedProcessorSettings(procNode, dto); + if (!procNode.getScheduledState().name().equals(dto.getState())) { try { switch (ScheduledState.valueOf(dto.getState())) { @@ -964,15 +966,12 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { private void updateProcessor(final ProcessorNode procNode, final ProcessorDTO processorDTO, final ProcessGroup processGroup, final FlowController controller) throws ProcessorInstantiationException { final ProcessorConfigDTO config = processorDTO.getConfig(); - procNode.setPosition(toPosition(processorDTO.getPosition())); - procNode.setName(processorDTO.getName()); - procNode.setStyle(processorDTO.getStyle()); procNode.setProcessGroup(processGroup); - procNode.setComments(config.getComments()); procNode.setLossTolerant(config.isLossTolerant()); procNode.setPenalizationPeriod(config.getPenaltyDuration()); procNode.setYieldPeriod(config.getYieldDuration()); procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel())); + updateNonFingerprintedProcessorSettings(procNode, processorDTO); if (config.getSchedulingStrategy() != null) { procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy())); @@ -1011,6 +1010,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { } } + private void updateNonFingerprintedProcessorSettings(final ProcessorNode procNode, final ProcessorDTO processorDTO) { + procNode.setName(processorDTO.getName()); + procNode.setPosition(toPosition(processorDTO.getPosition())); + procNode.setStyle(processorDTO.getStyle()); + procNode.setComments(processorDTO.getConfig().getComments()); + } + private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException { // get the parent group ID diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 1a1acc0fb4..6d96a5c24d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -256,9 +256,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable */ @Override public synchronized void setComments(final String comments) { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } this.comments.set(CharacterFilterUtils.filterInvalidXmlCharacters(comments)); } @@ -405,9 +402,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable @Override public synchronized void setName(final String name) { - if (isRunning()) { - throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running"); - } super.setName(name); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index d9e048e713..1ef3e8b5c7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -341,6 +341,25 @@ public class FingerprintFactory { final BundleDTO bundle = FlowFromDOMFactory.getBundle(DomUtils.getChild(processorElem, "bundle")); addBundleFingerprint(builder, bundle); + // max concurrent tasks + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "maxConcurrentTasks")); + // scheduling period + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "schedulingPeriod")); + // penalization period + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "penalizationPeriod")); + // yield period + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "yieldPeriod")); + // bulletin level + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "bulletinLevel")); + // loss tolerant + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "lossTolerant")); + // scheduling strategy + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "schedulingStrategy")); + // execution node + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "executionNode")); + // run duration nanos + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "runDurationNanos")); + // get the temp instance of the Processor so that we know the default property values final BundleCoordinate coordinate = getCoordinate(className, bundle); final ConfigurableComponent configurableComponent = ExtensionManager.getTempComponent(className, coordinate); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1a.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1a.xml index beccdccf26..28967673a5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1a.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1a.xml @@ -25,13 +25,24 @@ d89ada5d-35fb-44ff-83f1-4cc00b48b2df GenerateFlowFile - + org.apache.nifi.processors.standard.GenerateFlowFile + + org.apache.nifi + nifi-standard-nar + 1.3.0-SNAPSHOT + 1 - 0 s + 0 sec + 30 sec + 1 sec + WARN false - false + RUNNING + TIMER_DRIVEN + ALL + 0 file.size 5 diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1b.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1b.xml index 19ed079370..e8d95db801 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1b.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1b.xml @@ -23,15 +23,28 @@ d89ada5d-35fb-44ff-83f1-4cc00b48b2df - GenerateFlowFile - - - + GenerateFlowFile1 + + + + + this is a comment org.apache.nifi.processors.standard.GenerateFlowFile + + org.apache.nifi + nifi-standard-nar + 1.3.0-SNAPSHOT + 1 - 0 s + 0 sec + 30 sec + 1 sec + WARN false - false + DISABLED + TIMER_DRIVEN + ALL + 0 file.size 5 @@ -45,7 +58,7 @@ org.apache.nifi.processors.standard.LogAttribute - 10 + 1 0 s false false diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow2.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow2.xml index 8c0e641ba6..bab1778ad0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow2.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow2.xml @@ -25,13 +25,24 @@ d89ada5d-35fb-44ff-83f1-4cc00b48b2dd GenerateFlowFile - + org.apache.nifi.processors.standard.GenerateFlowFile + + org.apache.nifi + nifi-standard-nar + 1.4.0-SNAPSHOT + 1 0 s - false - false + 30 s + 1 s + ERROR + true + RUNNING + CRON_DRIVEN + PRIMARY + 1 file.size 5