NIFI-3163: Add new RPG settings to fingerprint

- Added timeout, yieldPeriod, transportProtocol, proxyHost, proxyPort,
  proxyUser and proxyPassword

- Removed unused fingerprint methods

FingerprintFactory has two types of fingerprinting method, from XML
elements and from DTO. However, the ones from DTO are not used by
anywhere. IDE didn't report those private methods unused because
addProcessGroupFingerprint and addSnippetFingerprint call each other,
but those are not used from outside actuallly.

This commit removes those private methods to keep the class clean to
avoid unnecessary code maintenance and tests.

This closes #1332.
This commit is contained in:
Koji Kawamura 2016-12-14 19:21:45 +09:00 committed by Mark Payne
parent 90ff275bb9
commit 27afd2e0cc
2 changed files with 147 additions and 316 deletions

View File

@ -28,19 +28,7 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.util.DomUtils;
import org.apache.nifi.web.api.dto.ComponentDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.FunnelDTO;
import org.apache.nifi.web.api.dto.LabelDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -64,17 +52,21 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
/**
* Creates a fingerprint of a flow.xml. The order of elements or attributes in the flow.xml does not influence the fingerprint generation.
* <p>Creates a fingerprint of a flow.xml. The order of elements or attributes in the flow.xml does not influence the fingerprint generation.
*
* Only items in the flow.xml that influence the processing of data are incorporated into the fingerprint. Examples of items involved in the fingerprint are: processor IDs, processor relationships,
* and processor properties. Examples of items not involved in the fingerprint are: items in the processor "settings" or "comments" tabs, position information, flow controller settings, and counters.
* <p>Only items in the flow.xml that influence the processing of data are incorporated into the fingerprint.
* Examples of items involved in the fingerprint are: processor IDs, processor relationships, and processor properties.
* Examples of items not involved in the fingerprint are: items in the processor "comments" tabs, position information, flow controller settings, and counters.
*
* <p>The determination for making items into the fingerprint is whether we can
* easily change the setting in order to inherit the cluster's flow.
* For example, if the component has to be stopped to apply the change and started again,
* then the item should be included in a fingerprint.
*/
public class FingerprintFactory {
@ -272,135 +264,6 @@ public class FingerprintFactory {
return builder;
}
private StringBuilder addSnippetFingerprint(final StringBuilder builder, final FlowSnippetDTO snippet, final FlowController controller) {
final Comparator<ComponentDTO> componentComparator = new Comparator<ComponentDTO>() {
@Override
public int compare(final ComponentDTO o1, final ComponentDTO o2) {
if (o1 == null && o2 == null) {
return 0;
}
if (o1 == null) {
return 1;
}
if (o2 == null) {
return -1;
}
return o1.getId().compareTo(o2.getId());
}
};
final Set<ConnectionDTO> connections = snippet.getConnections();
if (connections == null || connections.isEmpty()) {
builder.append("NO_CONNECTIONS");
} else {
final List<ConnectionDTO> sortedConnections = new ArrayList<>(connections);
Collections.sort(sortedConnections, componentComparator);
for (final ConnectionDTO connection : sortedConnections) {
addConnectionFingerprint(builder, connection);
}
}
final Set<FunnelDTO> funnels = snippet.getFunnels();
if (funnels == null || funnels.isEmpty()) {
builder.append("NO_FUNNELS");
} else {
final List<FunnelDTO> sortedFunnels = new ArrayList<>(funnels);
Collections.sort(sortedFunnels, componentComparator);
for (final FunnelDTO funnel : sortedFunnels) {
addFunnelFingerprint(builder, funnel);
}
}
final Set<PortDTO> inputPorts = snippet.getInputPorts();
if (inputPorts == null || inputPorts.isEmpty()) {
builder.append("NO_INPUT_PORTS");
} else {
final List<PortDTO> sortedInputPorts = new ArrayList<>(inputPorts);
Collections.sort(sortedInputPorts, componentComparator);
for (final PortDTO port : sortedInputPorts) {
addPortFingerprint(builder, port);
}
}
final Set<PortDTO> outputPorts = snippet.getOutputPorts();
if (outputPorts == null || outputPorts.isEmpty()) {
builder.append("NO_OUTPUT_PORTS");
} else {
final List<PortDTO> sortedOutputPorts = new ArrayList<>(outputPorts);
Collections.sort(sortedOutputPorts, componentComparator);
for (final PortDTO port : sortedOutputPorts) {
addPortFingerprint(builder, port);
}
}
final Set<LabelDTO> labels = snippet.getLabels();
if (labels == null || labels.isEmpty()) {
builder.append("NO_LABELS");
} else {
final List<LabelDTO> sortedLabels = new ArrayList<>(labels);
Collections.sort(sortedLabels, componentComparator);
for (final LabelDTO label : sortedLabels) {
addLabelFingerprint(builder, label);
}
}
final Set<ProcessGroupDTO> procGroups = snippet.getProcessGroups();
if (procGroups == null || procGroups.isEmpty()) {
builder.append("NO_PROCESS_GROUPS");
} else {
final List<ProcessGroupDTO> sortedProcGroups = new ArrayList<>(procGroups);
Collections.sort(sortedProcGroups, componentComparator);
for (final ProcessGroupDTO procGroup : sortedProcGroups) {
addProcessGroupFingerprint(builder, procGroup, controller);
}
}
final Set<ProcessorDTO> processors = snippet.getProcessors();
if (processors == null || processors.isEmpty()) {
builder.append("NO_PROCESSORS");
} else {
final List<ProcessorDTO> sortedProcessors = new ArrayList<>(processors);
Collections.sort(sortedProcessors, componentComparator);
for (final ProcessorDTO proc : sortedProcessors) {
addProcessorFingerprint(builder, proc, controller);
}
}
final Set<RemoteProcessGroupDTO> remoteGroups = snippet.getRemoteProcessGroups();
if (remoteGroups == null || remoteGroups.isEmpty()) {
builder.append("NO_REMOTE_PROCESS_GROUPS");
} else {
final List<RemoteProcessGroupDTO> sortedRemoteGroups = new ArrayList<>(remoteGroups);
Collections.sort(sortedRemoteGroups, componentComparator);
for (final RemoteProcessGroupDTO remoteGroup : sortedRemoteGroups) {
addRemoteProcessGroupFingerprint(builder, remoteGroup);
}
}
final Set<ControllerServiceDTO> services = snippet.getControllerServices();
if (services == null || services.isEmpty()) {
builder.append("NO_CONTROLLER_SERVICES");
} else {
final List<ControllerServiceDTO> sortedServices = new ArrayList<>(services);
Collections.sort(sortedServices, componentComparator);
for (final ControllerServiceDTO service : sortedServices) {
addControllerServiceFingerprint(builder, service, controller);
}
}
return builder;
}
private StringBuilder addProcessGroupFingerprint(final StringBuilder builder, final Element processGroupElem, final FlowController controller) throws FingerprintException {
// id
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "id"));
@ -464,16 +327,6 @@ public class FingerprintFactory {
return builder;
}
private StringBuilder addProcessGroupFingerprint(final StringBuilder builder, final ProcessGroupDTO group, final FlowController controller) {
builder.append(group.getId());
builder.append(group.getName());
builder.append(group.getParentGroupId());
final FlowSnippetDTO contents = group.getContents();
addSnippetFingerprint(builder, contents, controller);
return builder;
}
private StringBuilder addFlowFileProcessorFingerprint(final StringBuilder builder, final Element processorElem, final FlowController controller) throws FingerprintException {
// id
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "id"));
@ -521,51 +374,6 @@ public class FingerprintFactory {
return builder;
}
private StringBuilder addProcessorFingerprint(final StringBuilder builder, final ProcessorDTO processor, final FlowController controller) {
final ProcessorConfigDTO config = processor.getConfig();
builder.append(processor.getId());
builder.append(processor.getClass().getName());
builder.append(processor.getName());
builder.append(config.getBulletinLevel());
builder.append(config.getComments());
builder.append(config.getSchedulingPeriod());
builder.append(config.getSchedulingStrategy());
builder.append(config.getExecutionNode());
builder.append(config.getYieldDuration());
builder.append(config.getConcurrentlySchedulableTaskCount());
builder.append(config.getPenaltyDuration());
builder.append(config.getAnnotationData());
// create an instance of the Processor so that we know the default property values
Processor processorInstance = null;
try {
if (controller != null) {
processorInstance = controller.createProcessor(processor.getType(), UUID.randomUUID().toString(), false).getProcessor();
}
} catch (ProcessorInstantiationException e) {
logger.warn("Unable to create Processor of type {} due to {}; its default properties will be fingerprinted instead of being ignored.", processor.getType(), e.toString());
if (logger.isDebugEnabled()) {
logger.warn("", e);
}
}
addPropertiesFingerprint(builder, processorInstance, config.getProperties());
final Set<String> autoTerm = config.getAutoTerminatedRelationships();
if (autoTerm == null || autoTerm.isEmpty()) {
builder.append("NO_AUTO_TERMINATED_RELATIONSHIPS");
} else {
final List<String> sortedAutoTerm = new ArrayList<>(autoTerm);
Collections.sort(sortedAutoTerm);
for (final String rel : sortedAutoTerm) {
builder.append(rel);
}
}
return builder;
}
private StringBuilder addPropertyFingerprint(final StringBuilder builder, final ConfigurableComponent component, final String propName, final String propValue) throws FingerprintException {
// If we have a component to use, first determine if the value given is the default value for the specified property.
// If so, we do not add the property to the fingerprint.
@ -636,50 +444,23 @@ public class FingerprintFactory {
return builder;
}
private StringBuilder addPortFingerprint(final StringBuilder builder, final PortDTO port) {
builder.append(port.getId());
builder.append(port.getName());
final Set<String> userAccessControl = port.getUserAccessControl();
if (userAccessControl == null || userAccessControl.isEmpty()) {
builder.append("NO_USER_ACCESS_CONTROL");
} else {
final List<String> sortedAccessControl = new ArrayList<>(userAccessControl);
Collections.sort(sortedAccessControl);
for (final String user : sortedAccessControl) {
builder.append(user);
}
}
final Set<String> groupAccessControl = port.getGroupAccessControl();
if (groupAccessControl == null || groupAccessControl.isEmpty()) {
builder.append("NO_GROUP_ACCESS_CONTROL");
} else {
final List<String> sortedAccessControl = new ArrayList<>(groupAccessControl);
Collections.sort(sortedAccessControl);
for (final String user : sortedAccessControl) {
builder.append(user);
}
}
return builder;
}
private StringBuilder addLabelFingerprint(final StringBuilder builder, final Element labelElem) {
appendFirstValue(builder, DomUtils.getChildNodesByTagName(labelElem, "id"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(labelElem, "value"));
return builder;
}
private StringBuilder addLabelFingerprint(final StringBuilder builder, final LabelDTO label) {
builder.append(label.getId());
builder.append(label.getLabel());
return builder;
}
private StringBuilder addRemoteProcessGroupFingerprint(final StringBuilder builder, final Element remoteProcessGroupElem) throws FingerprintException {
appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "id"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "url"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "networkInterface"));
for (String tagName : new String[]{"id", "urls", "networkInterface", "timeout", "yieldPeriod",
"transportProtocol", "proxyHost", "proxyPort", "proxyUser", "proxyPassword"}) {
final String value = getFirstValue(DomUtils.getChildNodesByTagName(remoteProcessGroupElem, tagName));
if (isEncrypted(value)) {
builder.append(decrypt(value));
} else {
builder.append(value);
}
}
final NodeList inputPortList = DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "inputPort");
final NodeList outputPortList = DomUtils.getChildNodesByTagName(remoteProcessGroupElem, "outputPort");
@ -745,59 +526,6 @@ public class FingerprintFactory {
return builder;
}
private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final RemoteProcessGroupPortDTO port) {
builder.append(port.getId());
builder.append(port.getConcurrentlySchedulableTaskCount());
builder.append(port.getUseCompression());
return builder;
}
private StringBuilder addRemoteProcessGroupFingerprint(final StringBuilder builder, final RemoteProcessGroupDTO remoteGroup) {
builder.append(remoteGroup.getId());
builder.append(remoteGroup.getTargetUri());
final Comparator<RemoteProcessGroupPortDTO> comparator = new Comparator<RemoteProcessGroupPortDTO>() {
@Override
public int compare(RemoteProcessGroupPortDTO o1, RemoteProcessGroupPortDTO o2) {
if (o1 == null && o2 == null) {
return 0;
}
if (o1 == null) {
return 1;
}
if (o2 == null) {
return -1;
}
return o1.getName().compareTo(o2.getName());
}
};
final RemoteProcessGroupContentsDTO contents = remoteGroup.getContents();
if (contents != null) {
if (contents.getInputPorts() != null) {
final List<RemoteProcessGroupPortDTO> sortedInputPorts = new ArrayList<>(contents.getInputPorts());
Collections.sort(sortedInputPorts, comparator);
for (final RemoteProcessGroupPortDTO port : sortedInputPorts) {
if (port.isConnected()) {
addRemoteGroupPortFingerprint(builder, port);
}
}
}
if (contents.getOutputPorts() != null) {
final List<RemoteProcessGroupPortDTO> sortedOutputPorts = new ArrayList<>(contents.getOutputPorts());
Collections.sort(sortedOutputPorts, comparator);
for (final RemoteProcessGroupPortDTO port : sortedOutputPorts) {
if (port.isConnected()) {
addRemoteGroupPortFingerprint(builder, port);
}
}
}
}
return builder;
}
private StringBuilder addConnectionFingerprint(final StringBuilder builder, final Element connectionElem) throws FingerprintException {
// id
@ -821,26 +549,7 @@ public class FingerprintFactory {
final NodeList relationshipElems = DomUtils.getChildNodesByTagName(connectionElem, "relationship");
final List<Element> sortedRelationshipElems = sortElements(relationshipElems, getConnectionRelationshipsComparator());
for (final Element relationshipElem : sortedRelationshipElems) {
builder.append(getValue(relationshipElem, "NO_VALUE"));
}
return builder;
}
private StringBuilder addConnectionFingerprint(final StringBuilder builder, final ConnectionDTO connection) throws FingerprintException {
builder.append(connection.getId());
builder.append(connection.getSource().getId());
builder.append(connection.getSource().getGroupId());
builder.append(connection.getSource().getType());
builder.append(connection.getDestination().getId());
builder.append(connection.getDestination().getGroupId());
builder.append(connection.getDestination().getType());
if (connection.getSelectedRelationships() != null) {
final List<String> sortedSelectedRelationships = new ArrayList<>(connection.getSelectedRelationships());
Collections.sort(sortedSelectedRelationships);
for (final String rel : sortedSelectedRelationships) {
builder.append(rel);
}
builder.append(getValue(relationshipElem, NO_VALUE));
}
return builder;
@ -852,11 +561,6 @@ public class FingerprintFactory {
return builder;
}
private StringBuilder addFunnelFingerprint(final StringBuilder builder, final FunnelDTO funnel) {
builder.append(funnel.getId());
return builder;
}
private void addControllerServiceFingerprint(final StringBuilder builder, final ControllerServiceDTO dto, final FlowController controller) {
builder.append(dto.getId());
builder.append(dto.getType());

View File

@ -20,11 +20,22 @@ import static org.apache.nifi.fingerprint.FingerprintFactory.FLOW_CONFIG_XSD;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.lang.reflect.Method;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.serialization.FlowSerializer;
import org.apache.nifi.controller.serialization.StandardFlowSerializer;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Before;
import org.junit.Test;
import org.xml.sax.ErrorHandler;
@ -37,15 +48,22 @@ import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
/**
*/
public class FingerprintFactoryTest {
private NiFiProperties nifiProperties;
private StringEncryptor encryptor;
private FingerprintFactory fingerprinter;
@Before
public void setup() {
fingerprinter = new FingerprintFactory(null);
nifiProperties = getNiFiProperties();
encryptor = StringEncryptor.createEncryptor(nifiProperties);
fingerprinter = new FingerprintFactory(encryptor);
}
@Test
@ -113,4 +131,113 @@ public class FingerprintFactoryTest {
throw new RuntimeException("Failed to create document builder for flow configuration.", e);
}
}
private <T> Element serializeElement(final StringEncryptor encryptor, final Class<T> componentClass, final T component,
final String serializerMethodName) throws Exception {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
final Document doc = docBuilder.newDocument();
final FlowSerializer flowSerializer = new StandardFlowSerializer(encryptor);
final Method serializeMethod = StandardFlowSerializer.class.getDeclaredMethod(serializerMethodName,
Element.class, componentClass);
serializeMethod.setAccessible(true);
final Element rootElement = doc.createElement("root");
serializeMethod.invoke(flowSerializer, rootElement, component);
return rootElement;
}
private NiFiProperties getNiFiProperties() {
final NiFiProperties nifiProperties = mock(NiFiProperties.class);
when(nifiProperties.getProperty(StringEncryptor.NF_SENSITIVE_PROPS_ALGORITHM)).thenReturn("PBEWITHMD5AND256BITAES-CBC-OPENSSL");
when(nifiProperties.getProperty(StringEncryptor.NF_SENSITIVE_PROPS_PROVIDER)).thenReturn("BC");
when(nifiProperties.getProperty(anyString(), anyString())).then(invocation -> invocation.getArgumentAt(1, String.class));
return nifiProperties;
}
private <T> String fingerprint(final String methodName, final Class<T> inputClass, final T input) throws Exception {
final Method fingerprintFromComponent = FingerprintFactory.class.getDeclaredMethod(methodName,
StringBuilder.class, inputClass);
fingerprintFromComponent.setAccessible(true);
final StringBuilder fingerprint = new StringBuilder();
fingerprintFromComponent.invoke(fingerprinter, fingerprint, input);
return fingerprint.toString();
}
@Test
public void testRemoteProcessGroupFingerprintRaw() throws Exception {
// Fill out every configuration.
final RemoteProcessGroup component = mock(RemoteProcessGroup.class);
when(component.getName()).thenReturn("name");
when(component.getIdentifier()).thenReturn("id");
when(component.getPosition()).thenReturn(new Position(10.5, 20.3));
when(component.getTargetUri()).thenReturn("http://node1:8080/nifi");
when(component.getTargetUris()).thenReturn("http://node1:8080/nifi, http://node2:8080/nifi");
when(component.getNetworkInterface()).thenReturn("eth0");
when(component.getComments()).thenReturn("comment");
when(component.getCommunicationsTimeout()).thenReturn("10 sec");
when(component.getYieldDuration()).thenReturn("30 sec");
when(component.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.RAW);
when(component.getProxyHost()).thenReturn(null);
when(component.getProxyPort()).thenReturn(null);
when(component.getProxyUser()).thenReturn(null);
when(component.getProxyPassword()).thenReturn(null);
// Assert fingerprints with expected one.
final String expected = "id" +
"http://node1:8080/nifi, http://node2:8080/nifi" +
"eth0" +
"10 sec" +
"30 sec" +
"RAW" +
"NO_VALUE" +
"NO_VALUE" +
"NO_VALUE" +
"NO_VALUE";
final Element rootElement = serializeElement(encryptor, RemoteProcessGroup.class, component, "addRemoteProcessGroup");
final Element componentElement = (Element) rootElement.getElementsByTagName("remoteProcessGroup").item(0);
assertEquals(expected, fingerprint("addRemoteProcessGroupFingerprint", Element.class, componentElement));
}
@Test
public void testRemoteProcessGroupFingerprintWithProxy() throws Exception {
// Fill out every configuration.
final RemoteProcessGroup component = mock(RemoteProcessGroup.class);
when(component.getName()).thenReturn("name");
when(component.getIdentifier()).thenReturn("id");
when(component.getPosition()).thenReturn(new Position(10.5, 20.3));
when(component.getTargetUri()).thenReturn("http://node1:8080/nifi");
when(component.getTargetUris()).thenReturn("http://node1:8080/nifi, http://node2:8080/nifi");
when(component.getComments()).thenReturn("comment");
when(component.getCommunicationsTimeout()).thenReturn("10 sec");
when(component.getYieldDuration()).thenReturn("30 sec");
when(component.getTransportProtocol()).thenReturn(SiteToSiteTransportProtocol.HTTP);
when(component.getProxyHost()).thenReturn("proxy-host");
when(component.getProxyPort()).thenReturn(3128);
when(component.getProxyUser()).thenReturn("proxy-user");
when(component.getProxyPassword()).thenReturn("proxy-pass");
// Assert fingerprints with expected one.
final String expected = "id" +
"http://node1:8080/nifi, http://node2:8080/nifi" +
"NO_VALUE" +
"10 sec" +
"30 sec" +
"HTTP" +
"proxy-host" +
"3128" +
"proxy-user" +
"proxy-pass";
final Element rootElement = serializeElement(encryptor, RemoteProcessGroup.class, component, "addRemoteProcessGroup");
final Element componentElement = (Element) rootElement.getElementsByTagName("remoteProcessGroup").item(0);
assertEquals(expected.toString(), fingerprint("addRemoteProcessGroupFingerprint", Element.class, componentElement));
}
}