NIFI-12232 Corrected Group Component ID Handling for Clustered Flows

Ensured that if a Process Group doesn't have a Versioned Component ID we use the ComponentIdLookup to create one based on its Instance ID in the same way that is done when serializing the flow; this ensures matching ID's when we synchronize flows across the cluster. Also included some code cleanup around failure handling on startup

This closes #8406

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2024-02-14 12:14:12 -05:00 committed by exceptionfactory
parent 0f4defa49a
commit a821966a87
No known key found for this signature in database
7 changed files with 95 additions and 10 deletions

View File

@ -292,7 +292,7 @@ public final class StandardConnection implements Connection {
}
if (previousDestination.isRunning() && !(previousDestination instanceof Funnel || previousDestination instanceof LocalPort)) {
throw new IllegalStateException("Cannot change destination of Connection because the current destination is running");
throw new IllegalStateException("Cannot change destination of Connection because the current destination ([%s]) is running".formatted(previousDestination));
}
if (getFlowFileQueue().isUnacknowledgedFlowFile()) {

View File

@ -244,7 +244,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
try {
final Map<String, ParameterProviderReference> parameterProviderReferences = versionedExternalFlow.getParameterProviders() == null
? new HashMap<>() : versionedExternalFlow.getParameterProviders();
final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() != null ? context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId()) : group;
final ProcessGroup topLevelGroup = syncOptions.getTopLevelGroupId() == null ? group : context.getFlowManager().getGroup(syncOptions.getTopLevelGroupId());
synchronize(group, versionedExternalFlow.getFlowContents(), versionedExternalFlow.getParameterContexts(),
parameterProviderReferences, topLevelGroup, syncOptions.isUpdateSettings());
} catch (final ProcessorInstantiationException pie) {
@ -702,6 +702,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
connectionsWithTempDestination.add(proposedConnection.getIdentifier());
}
LOG.debug("Changing destination of Connection {} from {} to {}", connection, connection.getDestination(), newDestination);
connection.setDestination(newDestination);
}
@ -733,6 +734,7 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
private boolean isTempDestinationNecessary(final Connection existingConnection, final VersionedConnection proposedConnection, final Connectable newDestination) {
if (newDestination == null) {
LOG.debug("Will use a temporary destination for {} because its destination doesn't yet exist", existingConnection);
return true;
}
@ -741,26 +743,51 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
final boolean port = connectableType == ConnectableType.OUTPUT_PORT || connectableType == ConnectableType.INPUT_PORT;
final boolean groupChanged = !newDestination.getProcessGroup().equals(existingConnection.getProcessGroup());
if (port && groupChanged) {
LOG.debug("Will use a temporary destination for {} because its destination is a port whose group has changed", existingConnection);
return true;
}
// If the proposed destination has a different group than the existing group, use a temp destination.
final String proposedDestinationGroupId = proposedConnection.getDestination().getGroupId();
final String destinationGroupVersionedComponentId = existingConnection.getDestination().getProcessGroup().getVersionedComponentId().orElse(null);
final String destinationGroupVersionedComponentId = getVersionedId(existingConnection.getDestination().getProcessGroup());
if (!Objects.equals(proposedDestinationGroupId, destinationGroupVersionedComponentId)) {
LOG.debug("Will use a temporary destination for {} because its destination has a different group than the existing group. " +
"Existing group ID is [{}] (instance ID of [{}]); proposed is [{}]",
existingConnection, destinationGroupVersionedComponentId, existingConnection.getProcessGroup().getIdentifier(), proposedDestinationGroupId);
return true;
}
// If the proposed connection exists in a different group than the existing group, use a temp destination.
final String connectionGroupVersionedComponentId = existingConnection.getProcessGroup().getVersionedComponentId().orElse(null);
final String connectionGroupVersionedComponentId = getVersionedId(existingConnection.getProcessGroup());
final String proposedGroupId = proposedConnection.getGroupIdentifier();
if (!Objects.equals(proposedGroupId, connectionGroupVersionedComponentId)) {
LOG.debug("Will use a temporary destination for {} because it has a different group than the existing group. Existing group ID is [{}]; proposed is [{}]",
existingConnection, connectionGroupVersionedComponentId, proposedGroupId);
return true;
}
return false;
}
private String getVersionedId(final ProcessGroup processGroup) {
return getVersionedId(processGroup.getIdentifier(), processGroup.getVersionedComponentId().orElse(null));
}
/**
* Determines the Versioned Component ID to use for a component by first using the Versioned ID if it is already available. Otherwise,
* use the Component ID Lookup to determine the Versioned ID based on the Instance ID. This allows us to ensure that when we sync the dataflow,
* we use the same approach to mapping as we will use when we write out the dataflow.
*
* @return the Versioned Component ID to use for the component
*/
private String getVersionedId(final String instanceId, final String versionedId) {
if (versionedId != null) {
return versionedId;
}
return this.context.getFlowMappingOptions().getComponentIdLookup().getComponentId(Optional.empty(), instanceId);
}
private Funnel getTemporaryFunnel(final ProcessGroup group) {
final String tempFunnelId = group.getIdentifier() + TEMP_FUNNEL_ID_SUFFIX;
Funnel temporaryFunnel = context.getFlowManager().getFunnel(tempFunnelId);

View File

@ -3443,10 +3443,12 @@ public final class StandardProcessGroup implements ProcessGroup {
if (currentId == null) {
versionedComponentId.set(componentId);
LOG.info("Set Versioned Component ID of {} to {}", this, componentId);
} else if (currentId.equals(componentId)) {
return;
} else if (componentId == null) {
versionedComponentId.set(null);
LOG.info("Cleared Versioned Component ID for {}", this);
} else {
throw new IllegalStateException(this + " is already under version control with a different Versioned Component ID");
}

View File

@ -2713,8 +2713,17 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr
}
public void onClusterDisconnect() {
leaderElectionManager.unregister(ClusterRoles.PRIMARY_NODE);
leaderElectionManager.unregister(ClusterRoles.CLUSTER_COORDINATOR);
try {
leaderElectionManager.unregister(ClusterRoles.PRIMARY_NODE);
} catch (final Exception e) {
LOG.warn("Failed to unregister this node as a Primary Node candidate", e);
}
try {
leaderElectionManager.unregister(ClusterRoles.CLUSTER_COORDINATOR);
} catch (final Exception e) {
LOG.warn("Failed to unregister this node as a Cluster Coordinator candidate", e);
}
}
public LeaderElectionManager getLeaderElectionManager() {

View File

@ -27,10 +27,10 @@ import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
import org.apache.nifi.controller.leader.election.TrackedLeaderElectionManager;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.framework.cluster.zookeeper.ZooKeeperClientConfig;
import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
@ -177,7 +177,14 @@ public class CuratorLeaderElectionManager extends TrackedLeaderElectionManager {
leaderRole.getElectionListener().disable();
leaderSelector.close();
try {
leaderSelector.close();
} catch (final Exception e) {
// LeaderSelector will throw an IllegalStateException if it is not in the STARTED state.
// However, it exposes no method to check its state, so we have to catch the exception and ignore it.
logger.debug("Failed to close Leader Selector when unregistering for Role '{}'", roleName, e);
}
logger.info("Unregistered for Election: Role [{}]", roleName);
}

View File

@ -851,7 +851,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
flowService.stop(false);
}
logger.error("Failed to start Flow Service", e);
throw new Exception("Failed to start Flow Service" + e); // cannot wrap the exception as they are not defined in a classloader accessible to the caller
throw new Exception("Failed to start Flow Service: " + e); // cannot wrap the exception as they are not defined in a classloader accessible to the caller
}
}
@ -964,7 +964,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
private void startUpFailure(Throwable t) {
System.err.println("Failed to start web server: " + t.getMessage());
System.err.println("Shutting down...");
logger.warn("Failed to start web server... shutting down.", t);
logger.error("Failed to start web server... shutting down.", t);
System.exit(1);
}

View File

@ -756,6 +756,46 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
}
@Test
public void testReconnectWithRunningProcessorUnchanged() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity reverseContents = getClientUtil().createProcessor("ReverseContents");
final ProcessorEntity terminateFlowFile = getClientUtil().createProcessor("TerminateFlowFile");
getClientUtil().createConnection(generateFlowFile, reverseContents, "success");
getClientUtil().createConnection(reverseContents, terminateFlowFile, "success");
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
getClientUtil().waitForValidProcessor(reverseContents.getId());
getClientUtil().waitForValidProcessor(terminateFlowFile.getId());
getClientUtil().startProcessor(reverseContents);
disconnectNode(2);
reconnectNode(2);
waitForAllNodesConnected();
}
@Test
public void testReconnectWithRunningProcessorUnchangedInChildGroup() throws NiFiClientException, IOException, InterruptedException {
final ProcessGroupEntity group = getClientUtil().createProcessGroup("testReconnectWithRunningProcessorUnchangedInChildGroup", "root");
final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile", group.getId());
final ProcessorEntity reverseContents = getClientUtil().createProcessor("ReverseContents", group.getId());
final ProcessorEntity terminateFlowFile = getClientUtil().createProcessor("TerminateFlowFile", group.getId());
getClientUtil().createConnection(generateFlowFile, reverseContents, "success", group.getId());
getClientUtil().createConnection(reverseContents, terminateFlowFile, "success", group.getId());
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
getClientUtil().waitForValidProcessor(reverseContents.getId());
getClientUtil().waitForValidProcessor(terminateFlowFile.getId());
getClientUtil().startProcessor(reverseContents);
disconnectNode(2);
reconnectNode(2);
waitForAllNodesConnected();
}
private VersionedDataflow getNode2Flow() throws IOException {
final File instanceDir = getNiFiInstance().getNodeInstance(2).getInstanceDirectory();
final File conf = new File(instanceDir, "conf");