From 4ae1fec78a77be5b9b06bd79fa22ae020f3365bc Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 24 Jun 2019 11:41:40 -0400 Subject: [PATCH] NIFI-6025: Include Processor 'scheduled state' (i.e., Enabled or Disabled) in the VersionedProcessor when pushing to Flow Registry and take into account when updating flows on the NiFi side NIFI-6025: Include difference in Scheduled State as a Local Flow Difference This closes #3546. Signed-off-by: Bryan Bende --- .../org/apache/nifi/groups/StandardProcessGroup.java | 10 ++++++++++ .../registry/flow/mapping/NiFiRegistryFlowMapper.java | 3 +++ .../java/org/apache/nifi/web/api/VersionsResource.java | 8 ++++---- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index b3e2a7c3a5..858d82656b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -458,6 +458,8 @@ public final class StandardProcessGroup implements ProcessGroup { } finally { readLock.unlock(); } + + onComponentModified(); } @Override @@ -477,6 +479,8 @@ public final class StandardProcessGroup implements ProcessGroup { } finally { readLock.unlock(); } + + onComponentModified(); } private StateManager getStateManager(final String componentId) { @@ -4544,6 +4548,12 @@ public final class StandardProcessGroup implements ProcessGroup { processor.setYieldPeriod(proposed.getYieldDuration()); processor.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY())); + if (proposed.getScheduledState() == org.apache.nifi.registry.flow.ScheduledState.DISABLED) { + disableProcessor(processor); + } else if (processor.getScheduledState() == ScheduledState.DISABLED) { + enableProcessor(processor); + } + if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) { final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle()); final List descriptors = new ArrayList<>(processor.getProperties().keySet()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java index fdfc14387f..67be279ef4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java @@ -27,6 +27,7 @@ import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.ComponentNode; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.PropertyConfiguration; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.queue.FlowFileQueue; @@ -544,6 +545,8 @@ public class NiFiRegistryFlowMapper { processor.setSchedulingStrategy(procNode.getSchedulingStrategy().name()); processor.setStyle(procNode.getStyle()); processor.setYieldDuration(procNode.getYieldPeriod()); + processor.setScheduledState(procNode.getScheduledState() == ScheduledState.DISABLED ? org.apache.nifi.registry.flow.ScheduledState.DISABLED + : org.apache.nifi.registry.flow.ScheduledState.ENABLED); return processor; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index 84744e2948..8aec9c9183 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -1214,7 +1214,7 @@ public class VersionsResource extends ApplicationResource { } catch (final ResumeFlowException rfe) { // Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow // since in this case the flow was successfully updated - we just couldn't re-enable the components. - logger.error(rfe.getMessage(), rfe); + logger.warn(rfe.getMessage(), rfe); vcur.fail(rfe.getMessage()); } catch (final Exception e) { logger.error("Failed to update flow to new version", e); @@ -1415,7 +1415,7 @@ public class VersionsResource extends ApplicationResource { } catch (final ResumeFlowException rfe) { // Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow // since in this case the flow was successfully updated - we just couldn't re-enable the components. - logger.error(rfe.getMessage(), rfe); + logger.warn(rfe.getMessage(), rfe); vcur.fail(rfe.getMessage()); } catch (final Exception e) { logger.error("Failed to update flow to new version", e); @@ -1586,7 +1586,7 @@ public class VersionsResource extends ApplicationResource { } catch (final IllegalStateException ise) { // Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide // a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated. - throw new ResumeFlowException("Failed to re-enable Controller Services because " + ise.getMessage(), ise); + throw new ResumeFlowException("Successfully updated flow but could not re-enable all Controller Services because " + ise.getMessage(), ise); } } @@ -1638,7 +1638,7 @@ public class VersionsResource extends ApplicationResource { } catch (final IllegalStateException ise) { // Component Lifecycle will restart the Processors only if they are valid. If IllegalStateException gets thrown, we need to provide // a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated. - throw new ResumeFlowException("Failed to restart components because " + ise.getMessage(), ise); + throw new ResumeFlowException("Successfully updated flow but could not restart all Processors because " + ise.getMessage(), ise); } } }