NIFI-3981: When serializing flow to cluster, use the Scheduled State of ports as they are configured to be, not the current state, since the current state may change as soon as the FlowController has finished initializing

This closes #1861.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2017-05-25 15:31:43 -04:00 committed by Bryan Bende
parent 13b59b5621
commit 5cb928131c
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
4 changed files with 78 additions and 16 deletions

View File

@ -1518,7 +1518,29 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public void serialize(final FlowSerializer serializer, final OutputStream os) throws FlowSerializationException {
readLock.lock();
try {
final ScheduledStateLookup scheduledStateLookup = procNode -> startConnectablesAfterInitialization.contains(procNode) ? ScheduledState.RUNNING : procNode.getScheduledState();
final ScheduledStateLookup scheduledStateLookup = new ScheduledStateLookup() {
@Override
public ScheduledState getScheduledState(final ProcessorNode procNode) {
if (startConnectablesAfterInitialization.contains(procNode)) {
return ScheduledState.RUNNING;
}
return procNode.getScheduledState();
}
@Override
public ScheduledState getScheduledState(final Port port) {
if (startConnectablesAfterInitialization.contains(port)) {
return ScheduledState.RUNNING;
}
if (startRemoteGroupPortsAfterInitialization.contains(port)) {
return ScheduledState.RUNNING;
}
return port.getScheduledState();
}
};
serializer.serialize(this, os, scheduledStateLookup);
} finally {
readLock.unlock();
@ -2922,6 +2944,33 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
public void stopConnectable(final Connectable connectable) {
final ProcessGroup group = requireNonNull(connectable).getProcessGroup();
writeLock.lock();
try {
switch (requireNonNull(connectable).getConnectableType()) {
case FUNNEL:
// Ignore. We don't support stopping funnels.
break;
case INPUT_PORT:
case REMOTE_INPUT_PORT:
startConnectablesAfterInitialization.remove(connectable);
group.stopInputPort((Port) connectable);
break;
case OUTPUT_PORT:
case REMOTE_OUTPUT_PORT:
startConnectablesAfterInitialization.remove(connectable);
group.stopOutputPort((Port) connectable);
break;
default:
throw new IllegalArgumentException();
}
} finally {
writeLock.unlock();
}
}
public void startTransmitting(final RemoteGroupPort remoteGroupPort) {
writeLock.lock();
try {

View File

@ -774,19 +774,19 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
case DISABLED:
// switch processor do disabled. This means we have to stop it (if it's already stopped, this method does nothing),
// and then we have to disable it.
port.getProcessGroup().stopInputPort(port);
controller.stopConnectable(port);
port.getProcessGroup().disableInputPort(port);
break;
case RUNNING:
// we want to run now. Make sure processor is not disabled and then start it.
port.getProcessGroup().enableInputPort(port);
port.getProcessGroup().startInputPort(port);
controller.startConnectable(port);
break;
case STOPPED:
if (port.getScheduledState() == ScheduledState.DISABLED) {
port.getProcessGroup().enableInputPort(port);
} else if (port.getScheduledState() == ScheduledState.RUNNING) {
port.getProcessGroup().stopInputPort(port);
controller.stopConnectable(port);
}
break;
}
@ -803,19 +803,19 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
case DISABLED:
// switch processor do disabled. This means we have to stop it (if it's already stopped, this method does nothing),
// and then we have to disable it.
port.getProcessGroup().stopOutputPort(port);
controller.stopConnectable(port);
port.getProcessGroup().disableOutputPort(port);
break;
case RUNNING:
// we want to run now. Make sure processor is not disabled and then start it.
port.getProcessGroup().enableOutputPort(port);
port.getProcessGroup().startOutputPort(port);
controller.startConnectable(port);
break;
case STOPPED:
if (port.getScheduledState() == ScheduledState.DISABLED) {
port.getProcessGroup().enableOutputPort(port);
} else if (port.getScheduledState() == ScheduledState.RUNNING) {
port.getProcessGroup().stopOutputPort(port);
controller.stopConnectable(port);
}
break;
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.controller.serialization;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
@ -24,5 +25,17 @@ public interface ScheduledStateLookup {
ScheduledState getScheduledState(ProcessorNode procNode);
public static final ScheduledStateLookup IDENTITY_LOOKUP = ProcessorNode::getScheduledState;
ScheduledState getScheduledState(Port port);
public static final ScheduledStateLookup IDENTITY_LOOKUP = new ScheduledStateLookup() {
@Override
public ScheduledState getScheduledState(final ProcessorNode procNode) {
return procNode.getScheduledState();
}
@Override
public ScheduledState getScheduledState(final Port port) {
return port.getScheduledState();
}
};
}

View File

@ -159,19 +159,19 @@ public class StandardFlowSerializer implements FlowSerializer {
if (group.isRootGroup()) {
for (final Port port : group.getInputPorts()) {
addRootGroupPort(element, (RootGroupPort) port, "inputPort");
addRootGroupPort(element, (RootGroupPort) port, "inputPort", scheduledStateLookup);
}
for (final Port port : group.getOutputPorts()) {
addRootGroupPort(element, (RootGroupPort) port, "outputPort");
addRootGroupPort(element, (RootGroupPort) port, "outputPort", scheduledStateLookup);
}
} else {
for (final Port port : group.getInputPorts()) {
addPort(element, port, "inputPort");
addPort(element, port, "inputPort", scheduledStateLookup);
}
for (final Port port : group.getOutputPorts()) {
addPort(element, port, "outputPort");
addPort(element, port, "outputPort", scheduledStateLookup);
}
}
@ -330,7 +330,7 @@ public class StandardFlowSerializer implements FlowSerializer {
parentElement.appendChild(element);
}
private void addPort(final Element parentElement, final Port port, final String elementName) {
private void addPort(final Element parentElement, final Port port, final String elementName, final ScheduledStateLookup scheduledStateLookup) {
final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement(elementName);
parentElement.appendChild(element);
@ -338,12 +338,12 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(element, "name", port.getName());
addPosition(element, port.getPosition());
addTextElement(element, "comments", port.getComments());
addTextElement(element, "scheduledState", port.getScheduledState().name());
addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name());
parentElement.appendChild(element);
}
private void addRootGroupPort(final Element parentElement, final RootGroupPort port, final String elementName) {
private void addRootGroupPort(final Element parentElement, final RootGroupPort port, final String elementName, final ScheduledStateLookup scheduledStateLookup) {
final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement(elementName);
parentElement.appendChild(element);
@ -351,7 +351,7 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(element, "name", port.getName());
addPosition(element, port.getPosition());
addTextElement(element, "comments", port.getComments());
addTextElement(element, "scheduledState", port.getScheduledState().name());
addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name());
addTextElement(element, "maxConcurrentTasks", String.valueOf(port.getMaxConcurrentTasks()));
for (final String user : port.getUserAccessControl()) {
addTextElement(element, "userAccessControl", user);