NIFI-4075 Updating 'addRemoteProcessGroup' in StandardFlowSerializer to properly use ScheduledStateLookup. This closes #1922.

This commit is contained in:
Joe Percivall 2017-06-16 12:39:26 -04:00 committed by Mark Payne
parent aabd4a25d2
commit 77e49b749e
2 changed files with 14 additions and 12 deletions

View File

@ -188,7 +188,7 @@ public class StandardFlowSerializer implements FlowSerializer {
} }
for (final RemoteProcessGroup remoteRef : group.getRemoteProcessGroups()) { for (final RemoteProcessGroup remoteRef : group.getRemoteProcessGroups()) {
addRemoteProcessGroup(element, remoteRef); addRemoteProcessGroup(element, remoteRef, scheduledStateLookup);
} }
for (final Connection connection : group.getConnections()) { for (final Connection connection : group.getConnections()) {
@ -261,7 +261,7 @@ public class StandardFlowSerializer implements FlowSerializer {
addPosition(element, funnel.getPosition()); addPosition(element, funnel.getPosition());
} }
private void addRemoteProcessGroup(final Element parentElement, final RemoteProcessGroup remoteRef) { private void addRemoteProcessGroup(final Element parentElement, final RemoteProcessGroup remoteRef, final ScheduledStateLookup scheduledStateLookup) {
final Document doc = parentElement.getOwnerDocument(); final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement("remoteProcessGroup"); final Element element = doc.createElement("remoteProcessGroup");
parentElement.appendChild(element); parentElement.appendChild(element);
@ -290,20 +290,20 @@ public class StandardFlowSerializer implements FlowSerializer {
for (final RemoteGroupPort port : remoteRef.getInputPorts()) { for (final RemoteGroupPort port : remoteRef.getInputPorts()) {
if (port.hasIncomingConnection()) { if (port.hasIncomingConnection()) {
addRemoteGroupPort(element, port, "inputPort"); addRemoteGroupPort(element, port, "inputPort", scheduledStateLookup);
} }
} }
for (final RemoteGroupPort port : remoteRef.getOutputPorts()) { for (final RemoteGroupPort port : remoteRef.getOutputPorts()) {
if (!port.getConnections().isEmpty()) { if (!port.getConnections().isEmpty()) {
addRemoteGroupPort(element, port, "outputPort"); addRemoteGroupPort(element, port, "outputPort", scheduledStateLookup);
} }
} }
parentElement.appendChild(element); parentElement.appendChild(element);
} }
private void addRemoteGroupPort(final Element parentElement, final RemoteGroupPort port, final String elementName) { private void addRemoteGroupPort(final Element parentElement, final RemoteGroupPort port, final String elementName, final ScheduledStateLookup scheduledStateLookup) {
final Document doc = parentElement.getOwnerDocument(); final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement(elementName); final Element element = doc.createElement(elementName);
parentElement.appendChild(element); parentElement.appendChild(element);
@ -311,7 +311,7 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(element, "name", port.getName()); addTextElement(element, "name", port.getName());
addPosition(element, port.getPosition()); addPosition(element, port.getPosition());
addTextElement(element, "comments", port.getComments()); addTextElement(element, "comments", port.getComments());
addTextElement(element, "scheduledState", port.getScheduledState().name()); addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name());
addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks()); addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks());
addTextElement(element, "useCompression", String.valueOf(port.isUseCompression())); addTextElement(element, "useCompression", String.valueOf(port.isUseCompression()));
final Integer batchCount = port.getBatchCount(); final Integer batchCount = port.getBatchCount();

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.fingerprint; package org.apache.nifi.fingerprint;
import static org.apache.nifi.controller.serialization.ScheduledStateLookup.IDENTITY_LOOKUP;
import static org.apache.nifi.fingerprint.FingerprintFactory.FLOW_CONFIG_XSD; import static org.apache.nifi.fingerprint.FingerprintFactory.FLOW_CONFIG_XSD;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
@ -33,6 +34,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.connectable.Position; import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.serialization.FlowSerializer; import org.apache.nifi.controller.serialization.FlowSerializer;
import org.apache.nifi.controller.serialization.ScheduledStateLookup;
import org.apache.nifi.controller.serialization.StandardFlowSerializer; import org.apache.nifi.controller.serialization.StandardFlowSerializer;
import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup;
@ -169,7 +171,7 @@ public class FingerprintFactoryTest {
} }
private <T> Element serializeElement(final StringEncryptor encryptor, final Class<T> componentClass, final T component, private <T> Element serializeElement(final StringEncryptor encryptor, final Class<T> componentClass, final T component,
final String serializerMethodName) throws Exception { final String serializerMethodName, ScheduledStateLookup scheduledStateLookup) throws Exception {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder(); final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
@ -177,10 +179,10 @@ public class FingerprintFactoryTest {
final FlowSerializer flowSerializer = new StandardFlowSerializer(encryptor); final FlowSerializer flowSerializer = new StandardFlowSerializer(encryptor);
final Method serializeMethod = StandardFlowSerializer.class.getDeclaredMethod(serializerMethodName, final Method serializeMethod = StandardFlowSerializer.class.getDeclaredMethod(serializerMethodName,
Element.class, componentClass); Element.class, componentClass, ScheduledStateLookup.class);
serializeMethod.setAccessible(true); serializeMethod.setAccessible(true);
final Element rootElement = doc.createElement("root"); final Element rootElement = doc.createElement("root");
serializeMethod.invoke(flowSerializer, rootElement, component); serializeMethod.invoke(flowSerializer, rootElement, component, scheduledStateLookup);
return rootElement; return rootElement;
} }
@ -234,7 +236,7 @@ public class FingerprintFactoryTest {
"NO_VALUE" + "NO_VALUE" +
"NO_VALUE"; "NO_VALUE";
final Element rootElement = serializeElement(encryptor, RemoteProcessGroup.class, component, "addRemoteProcessGroup"); final Element rootElement = serializeElement(encryptor, RemoteProcessGroup.class, component, "addRemoteProcessGroup", IDENTITY_LOOKUP);
final Element componentElement = (Element) rootElement.getElementsByTagName("remoteProcessGroup").item(0); final Element componentElement = (Element) rootElement.getElementsByTagName("remoteProcessGroup").item(0);
assertEquals(expected, fingerprint("addRemoteProcessGroupFingerprint", Element.class, componentElement)); assertEquals(expected, fingerprint("addRemoteProcessGroupFingerprint", Element.class, componentElement));
@ -271,7 +273,7 @@ public class FingerprintFactoryTest {
"proxy-user" + "proxy-user" +
"proxy-pass"; "proxy-pass";
final Element rootElement = serializeElement(encryptor, RemoteProcessGroup.class, component, "addRemoteProcessGroup"); final Element rootElement = serializeElement(encryptor, RemoteProcessGroup.class, component, "addRemoteProcessGroup", IDENTITY_LOOKUP);
final Element componentElement = (Element) rootElement.getElementsByTagName("remoteProcessGroup").item(0); final Element componentElement = (Element) rootElement.getElementsByTagName("remoteProcessGroup").item(0);
assertEquals(expected.toString(), fingerprint("addRemoteProcessGroupFingerprint", Element.class, componentElement)); assertEquals(expected.toString(), fingerprint("addRemoteProcessGroupFingerprint", Element.class, componentElement));
} }
@ -310,7 +312,7 @@ public class FingerprintFactoryTest {
"64KB" + "64KB" +
"10sec"; "10sec";
final Element rootElement = serializeElement(encryptor, RemoteProcessGroup.class, groupComponent, "addRemoteProcessGroup"); final Element rootElement = serializeElement(encryptor, RemoteProcessGroup.class, groupComponent, "addRemoteProcessGroup", IDENTITY_LOOKUP);
final Element componentElement = (Element) rootElement.getElementsByTagName("inputPort").item(0); final Element componentElement = (Element) rootElement.getElementsByTagName("inputPort").item(0);
assertEquals(expected.toString(), fingerprint("addRemoteGroupPortFingerprint", Element.class, componentElement)); assertEquals(expected.toString(), fingerprint("addRemoteGroupPortFingerprint", Element.class, componentElement));
} }