From 118667a60182b5a73ac8cdd3c843f8827d9b4389 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 2 Jan 2018 13:09:09 -0500 Subject: [PATCH] NIFI-4436: Bug fixes Signed-off-by: Matt Gilman --- .../nifi/groups/StandardProcessGroup.java | 34 +++++++++++++++++-- .../web/dao/impl/StandardProcessGroupDAO.java | 1 + 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index bc5ef29533..83468cf9c0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -3284,7 +3284,23 @@ public final class StandardProcessGroup implements ProcessGroup { } final Set knownVariables = getKnownVariableNames(); - updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, updateDescendantVersionedFlows, knownVariables); + + final StandardVersionControlInformation originalVci = this.versionControlInfo.get(); + try { + updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings, updateDescendantVersionedFlows, knownVariables); + } catch (final Throwable t) { + // The proposed snapshot may not have any Versioned Flow Coordinates. As a result, the call to #updateProcessGroup may + // set this PG's Version Control Info to null. During the normal flow of control, + // the Version Control Information is set appropriately at the end. However, if an Exception is thrown, we need to ensure + // that we don't leave the Version Control Info as null. It's also important to note here that the Atomic Reference is used + // as a means of retrieving the value without obtaining a read lock, but the value is never updated outside of a write lock. + // As a result, it is safe to use the get() and then the set() methods of the AtomicReference without introducing the 'check-then-modify' problem. + if (this.versionControlInfo.get() == null) { + this.versionControlInfo.set(originalVci); + } + + throw t; + } } catch (final ProcessorInstantiationException pie) { throw new IllegalStateException("Failed to update flow", pie); } finally { @@ -3613,7 +3629,7 @@ public final class StandardProcessGroup implements ProcessGroup { if (connection == null) { final Connection added = addConnection(group, proposedConnection, componentIdSeed); LOG.info("Added {} to {}", added, this); - } else if (!connection.getSource().isRunning() && !connection.getDestination().isRunning()) { + } else if (isUpdateable(connection)) { // If the connection needs to be updated, then the source and destination will already have // been stopped (else, the validation above would fail). So if the source or the destination is running, // then we know that we don't need to update the connection. @@ -3690,6 +3706,20 @@ public final class StandardProcessGroup implements ProcessGroup { } } + private boolean isUpdateable(final Connection connection) { + final Connectable source = connection.getSource(); + if (source.getConnectableType() != ConnectableType.FUNNEL && source.isRunning()) { + return false; + } + + final Connectable destination = connection.getDestination(); + if (destination.getConnectableType() != ConnectableType.FUNNEL && destination.isRunning()) { + return false; + } + + return true; + } + private String generateUuid(final String propposedId, final String destinationGroupId, final String seed) { long msb = UUID.nameUUIDFromBytes((propposedId + destinationGroupId).getBytes(StandardCharsets.UTF_8)).getMostSignificantBits(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index e7e85af04c..5bbb56ffdd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -275,6 +275,7 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou public ProcessGroup disconnectVersionControl(final String groupId) { final ProcessGroup group = locateProcessGroup(flowController, groupId); group.disconnectVersionControl(true); + group.onComponentModified(); return group; }