NIFI-2316, NIFI-2318: Ensure that we do not save the flow before initializing the Run Status of components. Clarify the Node Event messages

This closes #678

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Mark Payne 2016-07-19 14:49:38 -04:00 committed by jpercivall
parent 69586d8bd0
commit 52bc23f5db
6 changed files with 113 additions and 61 deletions

View File

@ -35,6 +35,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@ -770,6 +771,55 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
}
private String summarizeStatusChange(final NodeConnectionStatus oldStatus, final NodeConnectionStatus status) {
final StringBuilder sb = new StringBuilder();
if (oldStatus != null && status.getState() == oldStatus.getState()) {
// Check if roles changed
final Set<String> oldRoles = oldStatus.getRoles();
final Set<String> newRoles = status.getRoles();
final Set<String> rolesRemoved = new HashSet<>(oldRoles);
rolesRemoved.removeAll(newRoles);
final Set<String> rolesAdded = new HashSet<>(newRoles);
rolesAdded.removeAll(oldRoles);
if (!rolesRemoved.isEmpty()) {
sb.append("Relinquished role");
if (rolesRemoved.size() != 1) {
sb.append("s");
}
sb.append(" ").append(rolesRemoved);
}
if (!rolesAdded.isEmpty()) {
if (sb.length() > 0) {
sb.append("; ");
}
sb.append("Acquired role");
if (rolesAdded.size() != 1) {
sb.append("s");
}
sb.append(" ").append(rolesAdded);
}
} else {
sb.append("Node Status changed from ").append(oldStatus == null ? "[Unknown Node]" : oldStatus.getState().toString()).append(" to ").append(status.getState().toString());
if (status.getState() == NodeConnectionState.CONNECTED) {
sb.append(" (Roles=").append(status.getRoles().toString()).append(")");
} else if (status.getDisconnectReason() != null) {
sb.append(" due to ").append(status.getDisconnectReason());
} else if (status.getDisconnectCode() != null) {
sb.append(" due to ").append(status.getDisconnectCode().toString());
}
}
return sb.toString();
}
private void handleNodeStatusChange(final NodeStatusChangeMessage statusChangeMessage) {
final NodeConnectionStatus updatedStatus = statusChangeMessage.getNodeConnectionStatus();
final NodeIdentifier nodeId = statusChangeMessage.getNodeId();
@ -790,14 +840,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus();
final StringBuilder sb = new StringBuilder();
sb.append("Connection Status changed to ").append(status.getState().toString());
if (status.getDisconnectReason() != null) {
sb.append(" due to ").append(status.getDisconnectReason());
} else if (status.getDisconnectCode() != null) {
sb.append(" due to ").append(status.getDisconnectCode().toString());
final String summary = summarizeStatusChange(oldStatus, status);
if (!StringUtils.isEmpty(summary)) {
addNodeEvent(nodeId, summary);
}
addNodeEvent(nodeId, sb.toString());
// Update our counter so that we are in-sync with the cluster on the
// most up-to-date version of the NodeConnectionStatus' Update Identifier.

View File

@ -565,7 +565,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED)));
this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
if (configuredForClustering) {
leaderElectionManager = new CuratorLeaderElectionManager(4);
@ -1459,7 +1460,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary()));
} finally {
writeLock.unlock();
}
@ -3349,7 +3350,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary()));
} finally {
writeLock.unlock();
}
@ -3386,7 +3387,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
eventDrivenWorkerQueue.setPrimary(primary);
// update the heartbeat bean
final HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new HeartbeatBean(rootGroup, primary, connectionStatus));
final HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new HeartbeatBean(rootGroup, primary));
// Emit a bulletin detailing the fact that the primary node state has changed
if (oldBean == null || oldBean.isPrimary() != primary) {
@ -3754,7 +3755,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public boolean isConnected() {
rwLock.readLock().lock();
try {
return connectionStatus.getState() == NodeConnectionState.CONNECTED;
return connectionStatus != null && connectionStatus.getState() == NodeConnectionState.CONNECTED;
} finally {
rwLock.readLock().unlock();
}
@ -3766,7 +3767,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
this.connectionStatus = connectionStatus;
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary()));
} finally {
rwLock.writeLock().unlock();
}
@ -3837,8 +3838,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (bean == null) {
readLock.lock();
try {
final NodeConnectionStatus connectionStatus = new NodeConnectionStatus(getNodeId(), DisconnectionCode.NOT_YET_CONNECTED);
bean = new HeartbeatBean(getGroup(getRootGroupId()), isPrimary(), connectionStatus);
bean = new HeartbeatBean(getGroup(getRootGroupId()), isPrimary());
} finally {
readLock.unlock();
}
@ -3868,7 +3868,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
roles.add(ClusterRoles.CLUSTER_COORDINATOR);
}
final Heartbeat heartbeat = new Heartbeat(nodeId, roles, bean.getConnectionStatus(), hbPayload.marshal());
final Heartbeat heartbeat = new Heartbeat(nodeId, roles, connectionStatus, hbPayload.marshal());
final HeartbeatMessage message = new HeartbeatMessage();
message.setHeartbeat(heartbeat);
@ -4002,12 +4002,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private static class HeartbeatBean {
private final ProcessGroup rootGroup;
private final boolean primary;
private final NodeConnectionStatus connectionStatus;
public HeartbeatBean(final ProcessGroup rootGroup, final boolean primary, final NodeConnectionStatus connectionStatus) {
public HeartbeatBean(final ProcessGroup rootGroup, final boolean primary) {
this.rootGroup = rootGroup;
this.primary = primary;
this.connectionStatus = connectionStatus;
}
public ProcessGroup getRootGroup() {
@ -4017,9 +4015,5 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public boolean isPrimary() {
return primary;
}
public NodeConnectionStatus getConnectionStatus() {
return connectionStatus;
}
}
}

View File

@ -16,6 +16,37 @@
*/
package org.apache.nifi.controller;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
import org.apache.nifi.authorization.Authorizer;
@ -63,37 +94,6 @@ import org.apache.nifi.web.revision.RevisionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class StandardFlowService implements FlowService, ProtocolHandler {
private static final String EVENT_CATEGORY = "Controller";
@ -444,7 +444,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
* the response will be null and we should load the local dataflow
* and heartbeat until a manager is located.
*/
final boolean localFlowEmpty = StandardFlowSynchronizer.isEmpty(proposedFlow, encryptor);
final boolean localFlowEmpty = StandardFlowSynchronizer.isEmpty(proposedFlow);
final ConnectionResponse response = connect(localFlowEmpty, localFlowEmpty);
// obtain write lock while we are updating the controller. We need to ensure that we don't

View File

@ -124,7 +124,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
autoResumeState = NiFiProperties.getInstance().getAutoResumeState();
}
public static boolean isEmpty(final DataFlow dataFlow, final StringEncryptor encryptor) {
public static boolean isEmpty(final DataFlow dataFlow) {
if (dataFlow == null || dataFlow.getFlow() == null || dataFlow.getFlow().length == 0) {
return true;
}
@ -135,7 +135,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootGroupElement);
final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, encodingVersion);
final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, null, encodingVersion);
return isEmpty(rootGroupDto);
}

View File

@ -260,7 +260,9 @@ public class FlowFromDOMFactory {
dto.setProxyHost(getString(element, "proxyHost"));
dto.setProxyPort(getOptionalInt(element, "proxyPort"));
dto.setProxyUser(getString(element, "proxyUser"));
String proxyPassword = decrypt(getString(element, "proxyPassword"), encryptor);
final String rawPassword = getString(element, "proxyPassword");
final String proxyPassword = encryptor == null ? rawPassword : decrypt(rawPassword, encryptor);
dto.setProxyPassword(proxyPassword);
return dto;
@ -395,7 +397,9 @@ public class FlowFromDOMFactory {
final List<Element> propertyNodeList = getChildrenByTagName(element, "property");
for (final Element propertyElement : propertyNodeList) {
final String name = getString(propertyElement, "name");
final String value = decrypt(getString(propertyElement, "value"), encryptor);
final String rawPropertyValue = getString(propertyElement, "value");
final String value = encryptor == null ? rawPropertyValue : decrypt(rawPropertyValue, encryptor);
properties.put(name, value);
}
return properties;

View File

@ -80,9 +80,17 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
final FlowSynchronizer flowSynchronizer = new StandardFlowSynchronizer(encryptor);
controller.synchronize(flowSynchronizer, dataFlow);
// save based on the controller, not the provided data flow because Process Groups may contain 'local' templates.
if (StandardFlowSynchronizer.isEmpty(dataFlow)) {
// If the dataflow is empty, we want to save it. We do this because when we start up a brand new cluster with no
// dataflow, we need to ensure that the flow is consistent across all nodes in the cluster and that upon restart
// of NiFi, the root group ID does not change. However, we don't always want to save it, because if the flow is
// not empty, then we can get into a bad situation, since the Processors, etc. don't have the appropriate "Scheduled
// State" yet (since they haven't yet been scheduled). So if there are components in the flow and we save it, we
// may end up saving the flow in such a way that all components are stopped.
// We save based on the controller, not the provided data flow because Process Groups may contain 'local' templates.
save(controller);
}
}
@Override
public synchronized void load(final OutputStream os) throws IOException {