diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index b31530f631..514d928656 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -35,6 +35,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.commons.collections4.queue.CircularFifoQueue; +import org.apache.commons.lang3.StringUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -770,6 +771,55 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl } } + private String summarizeStatusChange(final NodeConnectionStatus oldStatus, final NodeConnectionStatus status) { + final StringBuilder sb = new StringBuilder(); + + if (oldStatus != null && status.getState() == oldStatus.getState()) { + // Check if roles changed + final Set oldRoles = oldStatus.getRoles(); + final Set newRoles = status.getRoles(); + + final Set rolesRemoved = new HashSet<>(oldRoles); + rolesRemoved.removeAll(newRoles); + + final Set rolesAdded = new HashSet<>(newRoles); + rolesAdded.removeAll(oldRoles); + + if (!rolesRemoved.isEmpty()) { + sb.append("Relinquished role"); + if (rolesRemoved.size() != 1) { + sb.append("s"); + } + + sb.append(" ").append(rolesRemoved); + } + + if (!rolesAdded.isEmpty()) { + if (sb.length() > 0) { + sb.append("; "); + } + + sb.append("Acquired role"); + if (rolesAdded.size() != 1) { + sb.append("s"); + } + + sb.append(" ").append(rolesAdded); + } + } else { + sb.append("Node Status changed from ").append(oldStatus == null ? "[Unknown Node]" : oldStatus.getState().toString()).append(" to ").append(status.getState().toString()); + if (status.getState() == NodeConnectionState.CONNECTED) { + sb.append(" (Roles=").append(status.getRoles().toString()).append(")"); + } else if (status.getDisconnectReason() != null) { + sb.append(" due to ").append(status.getDisconnectReason()); + } else if (status.getDisconnectCode() != null) { + sb.append(" due to ").append(status.getDisconnectCode().toString()); + } + } + + return sb.toString(); + } + private void handleNodeStatusChange(final NodeStatusChangeMessage statusChangeMessage) { final NodeConnectionStatus updatedStatus = statusChangeMessage.getNodeConnectionStatus(); final NodeIdentifier nodeId = statusChangeMessage.getNodeId(); @@ -790,14 +840,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus); final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus(); - final StringBuilder sb = new StringBuilder(); - sb.append("Connection Status changed to ").append(status.getState().toString()); - if (status.getDisconnectReason() != null) { - sb.append(" due to ").append(status.getDisconnectReason()); - } else if (status.getDisconnectCode() != null) { - sb.append(" due to ").append(status.getDisconnectCode().toString()); + final String summary = summarizeStatusChange(oldStatus, status); + if (!StringUtils.isEmpty(summary)) { + addNodeEvent(nodeId, summary); } - addNodeEvent(nodeId, sb.toString()); // Update our counter so that we are in-sync with the cluster on the // most up-to-date version of the NodeConnectionStatus' Update Identifier. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 5de77f6006..264989527e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -565,7 +565,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS); - heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED))); + this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED); + heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false)); if (configuredForClustering) { leaderElectionManager = new CuratorLeaderElectionManager(4); @@ -1459,7 +1460,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } // update the heartbeat bean - this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus)); + this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary())); } finally { writeLock.unlock(); } @@ -3349,7 +3350,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } // update the heartbeat bean - this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus)); + this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary())); } finally { writeLock.unlock(); } @@ -3386,7 +3387,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R eventDrivenWorkerQueue.setPrimary(primary); // update the heartbeat bean - final HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new HeartbeatBean(rootGroup, primary, connectionStatus)); + final HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new HeartbeatBean(rootGroup, primary)); // Emit a bulletin detailing the fact that the primary node state has changed if (oldBean == null || oldBean.isPrimary() != primary) { @@ -3754,7 +3755,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public boolean isConnected() { rwLock.readLock().lock(); try { - return connectionStatus.getState() == NodeConnectionState.CONNECTED; + return connectionStatus != null && connectionStatus.getState() == NodeConnectionState.CONNECTED; } finally { rwLock.readLock().unlock(); } @@ -3766,7 +3767,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R this.connectionStatus = connectionStatus; // update the heartbeat bean - this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus)); + this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary())); } finally { rwLock.writeLock().unlock(); } @@ -3837,8 +3838,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (bean == null) { readLock.lock(); try { - final NodeConnectionStatus connectionStatus = new NodeConnectionStatus(getNodeId(), DisconnectionCode.NOT_YET_CONNECTED); - bean = new HeartbeatBean(getGroup(getRootGroupId()), isPrimary(), connectionStatus); + bean = new HeartbeatBean(getGroup(getRootGroupId()), isPrimary()); } finally { readLock.unlock(); } @@ -3868,7 +3868,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R roles.add(ClusterRoles.CLUSTER_COORDINATOR); } - final Heartbeat heartbeat = new Heartbeat(nodeId, roles, bean.getConnectionStatus(), hbPayload.marshal()); + final Heartbeat heartbeat = new Heartbeat(nodeId, roles, connectionStatus, hbPayload.marshal()); final HeartbeatMessage message = new HeartbeatMessage(); message.setHeartbeat(heartbeat); @@ -4002,12 +4002,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private static class HeartbeatBean { private final ProcessGroup rootGroup; private final boolean primary; - private final NodeConnectionStatus connectionStatus; - public HeartbeatBean(final ProcessGroup rootGroup, final boolean primary, final NodeConnectionStatus connectionStatus) { + public HeartbeatBean(final ProcessGroup rootGroup, final boolean primary) { this.rootGroup = rootGroup; this.primary = primary; - this.connectionStatus = connectionStatus; } public ProcessGroup getRootGroup() { @@ -4017,9 +4015,5 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public boolean isPrimary() { return primary; } - - public NodeConnectionStatus getConnectionStatus() { - return connectionStatus; - } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 4f286eb835..71d66b5416 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -16,6 +16,37 @@ */ package org.apache.nifi.controller; +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer; import org.apache.nifi.authorization.Authorizer; @@ -63,37 +94,6 @@ import org.apache.nifi.web.revision.RevisionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - public class StandardFlowService implements FlowService, ProtocolHandler { private static final String EVENT_CATEGORY = "Controller"; @@ -444,7 +444,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { * the response will be null and we should load the local dataflow * and heartbeat until a manager is located. */ - final boolean localFlowEmpty = StandardFlowSynchronizer.isEmpty(proposedFlow, encryptor); + final boolean localFlowEmpty = StandardFlowSynchronizer.isEmpty(proposedFlow); final ConnectionResponse response = connect(localFlowEmpty, localFlowEmpty); // obtain write lock while we are updating the controller. We need to ensure that we don't diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index d6dee2c48f..bc28380af8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -124,7 +124,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { autoResumeState = NiFiProperties.getInstance().getAutoResumeState(); } - public static boolean isEmpty(final DataFlow dataFlow, final StringEncryptor encryptor) { + public static boolean isEmpty(final DataFlow dataFlow) { if (dataFlow == null || dataFlow.getFlow() == null || dataFlow.getFlow().length == 0) { return true; } @@ -135,7 +135,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootGroupElement); - final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, encodingVersion); + final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, null, encodingVersion); return isEmpty(rootGroupDto); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index 1409df4b9d..2c51e96f27 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -260,7 +260,9 @@ public class FlowFromDOMFactory { dto.setProxyHost(getString(element, "proxyHost")); dto.setProxyPort(getOptionalInt(element, "proxyPort")); dto.setProxyUser(getString(element, "proxyUser")); - String proxyPassword = decrypt(getString(element, "proxyPassword"), encryptor); + + final String rawPassword = getString(element, "proxyPassword"); + final String proxyPassword = encryptor == null ? rawPassword : decrypt(rawPassword, encryptor); dto.setProxyPassword(proxyPassword); return dto; @@ -395,7 +397,9 @@ public class FlowFromDOMFactory { final List propertyNodeList = getChildrenByTagName(element, "property"); for (final Element propertyElement : propertyNodeList) { final String name = getString(propertyElement, "name"); - final String value = decrypt(getString(propertyElement, "value"), encryptor); + + final String rawPropertyValue = getString(propertyElement, "value"); + final String value = encryptor == null ? rawPropertyValue : decrypt(rawPropertyValue, encryptor); properties.put(name, value); } return properties; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java index 8b0d18f064..f73dce5562 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java @@ -80,8 +80,16 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD final FlowSynchronizer flowSynchronizer = new StandardFlowSynchronizer(encryptor); controller.synchronize(flowSynchronizer, dataFlow); - // save based on the controller, not the provided data flow because Process Groups may contain 'local' templates. - save(controller); + if (StandardFlowSynchronizer.isEmpty(dataFlow)) { + // If the dataflow is empty, we want to save it. We do this because when we start up a brand new cluster with no + // dataflow, we need to ensure that the flow is consistent across all nodes in the cluster and that upon restart + // of NiFi, the root group ID does not change. However, we don't always want to save it, because if the flow is + // not empty, then we can get into a bad situation, since the Processors, etc. don't have the appropriate "Scheduled + // State" yet (since they haven't yet been scheduled). So if there are components in the flow and we save it, we + // may end up saving the flow in such a way that all components are stopped. + // We save based on the controller, not the provided data flow because Process Groups may contain 'local' templates. + save(controller); + } } @Override