diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java index 9ef3b9fa39..1b971292fa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -292,7 +292,7 @@ public final class StandardConnection implements Connection { } if (previousDestination.isRunning() && !(previousDestination instanceof Funnel || previousDestination instanceof LocalPort)) { - throw new IllegalStateException("Cannot change destination of Connection because the current destination is running"); + throw new IllegalStateException("Cannot change destination of Connection because the current destination ([%s]) is running".formatted(previousDestination)); } if (getFlowFileQueue().isUnacknowledgedFlowFile()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index d687deee3d..74ffd715b3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -244,7 +244,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen try { final Map parameterProviderReferences = versionedExternalFlow.getParameterProviders() == null ? new HashMap<>() : versionedExternalFlow.getParameterProviders(); - final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() != null ? context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()) : group; + final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()); synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(), parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings()); } catch (final ProcessorInstantiationException pie) { @@ -702,6 +702,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen connectionsWithTempDestination.add(proposedConnection.getIdentifier()); } + LOG.debug("Changing destination of Connection {} from {} to {}", connection, connection.getDestination(), newDestination); connection.setDestination(newDestination); } @@ -733,6 +734,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen private boolean isTempDestinationNecessary(final Connection existingConnection, final VersionedConnection proposedConnection, final Connectable newDestination) { if (newDestination == null) { + LOG.debug("Will use a temporary destination for {} because its destination doesn't yet exist", existingConnection); return true; } @@ -741,26 +743,51 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen final boolean port = connectableType == ConnectableType.OUTPUT_PORT || connectableType == ConnectableType.INPUT_PORT; final boolean groupChanged = !newDestination.getProcessGroup().equals(existingConnection.getProcessGroup()); if (port && groupChanged) { + LOG.debug("Will use a temporary destination for {} because its destination is a port whose group has changed", existingConnection); return true; } // If the proposed destination has a different group than the existing group, use a temp destination. final String proposedDestinationGroupId = proposedConnection.getDestination().getGroupId(); - final String destinationGroupVersionedComponentId = existingConnection.getDestination().getProcessGroup().getVersionedComponentId().orElse(null); + final String destinationGroupVersionedComponentId = getVersionedId(existingConnection.getDestination().getProcessGroup()); if (!Objects.equals(proposedDestinationGroupId, destinationGroupVersionedComponentId)) { + LOG.debug("Will use a temporary destination for {} because its destination has a different group than the existing group. " + + "Existing group ID is [{}] (instance ID of [{}]); proposed is [{}]", + existingConnection, destinationGroupVersionedComponentId, existingConnection.getProcessGroup().getIdentifier(), proposedDestinationGroupId); return true; } // If the proposed connection exists in a different group than the existing group, use a temp destination. - final String connectionGroupVersionedComponentId = existingConnection.getProcessGroup().getVersionedComponentId().orElse(null); + final String connectionGroupVersionedComponentId = getVersionedId(existingConnection.getProcessGroup()); final String proposedGroupId = proposedConnection.getGroupIdentifier(); if (!Objects.equals(proposedGroupId, connectionGroupVersionedComponentId)) { + LOG.debug("Will use a temporary destination for {} because it has a different group than the existing group. Existing group ID is [{}]; proposed is [{}]", + existingConnection, connectionGroupVersionedComponentId, proposedGroupId); return true; } return false; } + private String getVersionedId(final ProcessGroup processGroup) { + return getVersionedId(processGroup.getIdentifier(), processGroup.getVersionedComponentId().orElse(null)); + } + + /** + * Determines the Versioned Component ID to use for a component by first using the Versioned ID if it is already available. Otherwise, + * use the Component ID Lookup to determine the Versioned ID based on the Instance ID. This allows us to ensure that when we sync the dataflow, + * we use the same approach to mapping as we will use when we write out the dataflow. + * + * @return the Versioned Component ID to use for the component + */ + private String getVersionedId(final String instanceId, final String versionedId) { + if (versionedId != null) { + return versionedId; + } + + return this.context.getFlowMappingOptions().getComponentIdLookup().getComponentId(Optional.empty(), instanceId); + } + private Funnel getTemporaryFunnel(final ProcessGroup group) { final String tempFunnelId = group.getIdentifier() + TEMP_FUNNEL_ID_SUFFIX; Funnel temporaryFunnel = context.getFlowManager().getFunnel(tempFunnelId); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index e9db230087..8721273373 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -3443,10 +3443,12 @@ public final class StandardProcessGroup implements ProcessGroup { if (currentId == null) { versionedComponentId.set(componentId); + LOG.info("Set Versioned Component ID of {} to {}", this, componentId); } else if (currentId.equals(componentId)) { return; } else if (componentId == null) { versionedComponentId.set(null); + LOG.info("Cleared Versioned Component ID for {}", this); } else { throw new IllegalStateException(this + " is already under version control with a different Versioned Component ID"); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 53ad12c56c..17e8c9e013 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -2713,8 +2713,17 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr } public void onClusterDisconnect() { - leaderElectionManager.unregister(ClusterRoles.PRIMARY_NODE); - leaderElectionManager.unregister(ClusterRoles.CLUSTER_COORDINATOR); + try { + leaderElectionManager.unregister(ClusterRoles.PRIMARY_NODE); + } catch (final Exception e) { + LOG.warn("Failed to unregister this node as a Primary Node candidate", e); + } + + try { + leaderElectionManager.unregister(ClusterRoles.CLUSTER_COORDINATOR); + } catch (final Exception e) { + LOG.warn("Failed to unregister this node as a Cluster Coordinator candidate", e); + } } public LeaderElectionManager getLeaderElectionManager() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-zookeeper-leader-election/src/main/java/org/apache/nifi/framework/cluster/leader/zookeeper/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-zookeeper-leader-election/src/main/java/org/apache/nifi/framework/cluster/leader/zookeeper/CuratorLeaderElectionManager.java index 62c77181f1..045671652f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-zookeeper-leader-election/src/main/java/org/apache/nifi/framework/cluster/leader/zookeeper/CuratorLeaderElectionManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-zookeeper-leader-election/src/main/java/org/apache/nifi/framework/cluster/leader/zookeeper/CuratorLeaderElectionManager.java @@ -27,10 +27,10 @@ import org.apache.curator.framework.recipes.leader.Participant; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.ZookeeperFactory; +import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener; import org.apache.nifi.controller.leader.election.TrackedLeaderElectionManager; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.framework.cluster.zookeeper.ZooKeeperClientConfig; -import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener; import org.apache.nifi.util.NiFiProperties; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; @@ -177,7 +177,14 @@ public class CuratorLeaderElectionManager extends TrackedLeaderElectionManager { leaderRole.getElectionListener().disable(); - leaderSelector.close(); + try { + leaderSelector.close(); + } catch (final Exception e) { + // LeaderSelector will throw an IllegalStateException if it is not in the STARTED state. + // However, it exposes no method to check its state, so we have to catch the exception and ignore it. + logger.debug("Failed to close Leader Selector when unregistering for Role '{}'", roleName, e); + } + logger.info("Unregistered for Election: Role [{}]", roleName); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java index 5a64679ce7..526cc6906c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java @@ -851,7 +851,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader { flowService.stop(false); } logger.error("Failed to start Flow Service", e); - throw new Exception("Failed to start Flow Service" + e); // cannot wrap the exception as they are not defined in a classloader accessible to the caller + throw new Exception("Failed to start Flow Service: " + e); // cannot wrap the exception as they are not defined in a classloader accessible to the caller } } @@ -964,7 +964,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader { private void startUpFailure(Throwable t) { System.err.println("Failed to start web server: " + t.getMessage()); System.err.println("Shutting down..."); - logger.warn("Failed to start web server... shutting down.", t); + logger.error("Failed to start web server... shutting down.", t); System.exit(1); } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java index 4d1b98ca83..fab3ef8d51 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java @@ -756,6 +756,46 @@ public class FlowSynchronizationIT extends NiFiSystemIT { } + @Test + public void testReconnectWithRunningProcessorUnchanged() throws NiFiClientException, IOException, InterruptedException { + final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile"); + final ProcessorEntity reverseContents = getClientUtil().createProcessor("ReverseContents"); + final ProcessorEntity terminateFlowFile = getClientUtil().createProcessor("TerminateFlowFile"); + getClientUtil().createConnection(generateFlowFile, reverseContents, "success"); + getClientUtil().createConnection(reverseContents, terminateFlowFile, "success"); + + getClientUtil().waitForValidProcessor(generateFlowFile.getId()); + getClientUtil().waitForValidProcessor(reverseContents.getId()); + getClientUtil().waitForValidProcessor(terminateFlowFile.getId()); + + getClientUtil().startProcessor(reverseContents); + + disconnectNode(2); + reconnectNode(2); + waitForAllNodesConnected(); + } + + @Test + public void testReconnectWithRunningProcessorUnchangedInChildGroup() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity group = getClientUtil().createProcessGroup("testReconnectWithRunningProcessorUnchangedInChildGroup", "root"); + final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile", group.getId()); + final ProcessorEntity reverseContents = getClientUtil().createProcessor("ReverseContents", group.getId()); + final ProcessorEntity terminateFlowFile = getClientUtil().createProcessor("TerminateFlowFile", group.getId()); + getClientUtil().createConnection(generateFlowFile, reverseContents, "success", group.getId()); + getClientUtil().createConnection(reverseContents, terminateFlowFile, "success", group.getId()); + + getClientUtil().waitForValidProcessor(generateFlowFile.getId()); + getClientUtil().waitForValidProcessor(reverseContents.getId()); + getClientUtil().waitForValidProcessor(terminateFlowFile.getId()); + + getClientUtil().startProcessor(reverseContents); + + disconnectNode(2); + reconnectNode(2); + waitForAllNodesConnected(); + } + + private VersionedDataflow getNode2Flow() throws IOException { final File instanceDir = getNiFiInstance().getNodeInstance(2).getInstanceDirectory(); final File conf = new File(instanceDir, "conf");