NIFI-10048 Corrected StandardFlowDifference.hashCode() to avoid NPE

- Added tests to TestFlowController for JSON flow configuration

This closes #6227

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Emilio Setiadarma 2022-07-27 13:19:35 -07:00 committed by exceptionfactory
parent 67c463627c
commit 296f308cdc
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
3 changed files with 280 additions and 35 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.controller;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.admin.service.AuditService;
@ -33,11 +35,15 @@ import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.controller.flow.VersionedFlowEncodingVersion;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.serialization.StandardFlowSynchronizer;
import org.apache.nifi.controller.serialization.VersionedFlowSynchronizer;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.mock.DummyProcessor;
@ -47,6 +53,9 @@ import org.apache.nifi.controller.service.mock.ServiceB;
import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.encrypt.PropertyEncryptorFactory;
import org.apache.nifi.flow.ComponentType;
import org.apache.nifi.flow.Position;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.groups.BundleUpdateStrategy;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogLevel;
@ -59,6 +68,7 @@ import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.persistence.FlowConfigurationArchiveManager;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.MockProvenanceRepository;
import org.apache.nifi.registry.VariableRegistry;
@ -125,6 +135,7 @@ public class TestFlowController {
private VariableRegistry variableRegistry;
private ExtensionDiscoveringManager extensionManager;
private StatusHistoryRepository statusHistoryRepository;
private FlowSynchronizer standardFlowSynchronizer;
@Before
public void setup() {
@ -188,6 +199,11 @@ public class TestFlowController {
bulletinRepo = mock(BulletinRepository.class);
controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer,
auditService, encryptor, bulletinRepo, variableRegistry, mock(FlowRegistryClient.class), extensionManager, statusHistoryRepository);
final XmlFlowSynchronizer xmlFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final VersionedFlowSynchronizer versionedFlowSynchronizer = new VersionedFlowSynchronizer(extensionManager,
nifiProperties.getFlowConfigurationJsonFile(), new FlowConfigurationArchiveManager(nifiProperties));
standardFlowSynchronizer = new StandardFlowSynchronizer(xmlFlowSynchronizer, versionedFlowSynchronizer);
}
@After
@ -198,8 +214,6 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWithReportingTaskAndProcessorReferencingControllerService() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
final File flowFile = new File("src/test/resources/conf/reporting-task-with-cs-flow-0.7.0.xml");
@ -259,8 +273,6 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWithProcessorReferencingControllerService() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
final File flowFile = new File("src/test/resources/conf/processor-with-cs-flow-0.7.0.xml");
@ -300,8 +312,6 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenAuthorizationsAreEqual() {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
final DataFlow proposedDataFlow = mock(DataFlow.class);
@ -314,8 +324,6 @@ public class TestFlowController {
@Test(expected = UninheritableFlowException.class)
public void testSynchronizeFlowWhenAuthorizationsAreDifferent() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final File flowFile = new File("src/test/resources/conf/processor-with-cs-flow-0.7.0.xml");
final String flow = IOUtils.toString(new FileInputStream(flowFile), StandardCharsets.UTF_8);
@ -335,8 +343,6 @@ public class TestFlowController {
@Test(expected = FlowSynchronizationException.class)
public void testSynchronizeFlowWithInvalidParameterContextReference() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final File flowFile = new File("src/test/resources/conf/parameter-context-flow-error.xml");
final String flow = IOUtils.toString(new FileInputStream(flowFile), StandardCharsets.UTF_8);
@ -353,8 +359,6 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWithNestedParameterContexts() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final File flowFile = new File("src/test/resources/conf/parameter-context-flow.xml");
final String flow = IOUtils.toString(new FileInputStream(flowFile), StandardCharsets.UTF_8);
@ -377,8 +381,6 @@ public class TestFlowController {
@Test
public void testCreateParameterContextWithAndWithoutValidation() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final File flowFile = new File("src/test/resources/conf/parameter-context-flow.xml");
final String flow = IOUtils.toString(new FileInputStream(flowFile), StandardCharsets.UTF_8);
@ -425,8 +427,6 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenAuthorizationsAreDifferentAndFlowEmpty() {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
// create a mock proposed data flow with different auth fingerprint as the current authorizer
final String authFingerprint = "<authorizations></authorizations>";
final DataFlow proposedDataFlow = mock(DataFlow.class);
@ -442,8 +442,6 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenProposedAuthorizationsAreNull() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final File flowFile = new File("src/test/resources/conf/processor-with-cs-flow-0.7.0.xml");
final String flow = IOUtils.toString(new FileInputStream(flowFile), StandardCharsets.UTF_8);
@ -467,8 +465,6 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenProposedAuthorizationsAreNullAndEmptyFlow() {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final DataFlow proposedDataFlow = mock(DataFlow.class);
when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(null);
@ -498,8 +494,6 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenCurrentAuthorizationsAreEmptyAndProposedAreNot() {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
// create a mock proposed data flow with the same auth fingerprint as the current authorizer
final String authFingerprint = authorizer.getFingerprint();
final DataFlow proposedDataFlow = mock(DataFlow.class);
@ -510,15 +504,13 @@ public class TestFlowController {
controller.shutdown(true);
controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer,
auditService, encryptor, bulletinRepo, variableRegistry, mock(FlowRegistryClient.class), extensionManager, statusHistoryRepository);
auditService, encryptor, bulletinRepo, variableRegistry, mock(FlowRegistryClient.class), extensionManager, statusHistoryRepository);
controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class), BundleUpdateStrategy.IGNORE_BUNDLE);
assertEquals(authFingerprint, authorizer.getFingerprint());
}
@Test
public void testSynchronizeFlowWhenProposedMissingComponentsAreDifferent() {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final Set<String> missingComponents = new HashSet<>();
missingComponents.add("1");
missingComponents.add("2");
@ -536,9 +528,6 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenExistingMissingComponentsAreDifferent() throws IOException {
final PropertyEncryptor encryptor = PropertyEncryptorFactory.getPropertyEncryptor(nifiProperties);
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final ProcessorNode mockProcessorNode = mock(ProcessorNode.class);
when(mockProcessorNode.getIdentifier()).thenReturn("1");
when(mockProcessorNode.isExtensionMissing()).thenReturn(true);
@ -581,8 +570,6 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenBundlesAreSame() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final LogRepository logRepository = LogRepositoryFactory.getRepository("d89ada5d-35fb-44ff-83f1-4cc00b48b2df");
logRepository.removeAllObservers();
@ -592,8 +579,6 @@ public class TestFlowController {
@Test
public void testSynchronizeFlowWhenBundlesAreDifferent() throws IOException {
final FlowSynchronizer standardFlowSynchronizer = new XmlFlowSynchronizer(nifiProperties, extensionManager);
final LogRepository logRepository = LogRepositoryFactory.getRepository("d89ada5d-35fb-44ff-83f1-4cc00b48b2df");
logRepository.removeAllObservers();
@ -1184,4 +1169,103 @@ public class TestFlowController {
assertEquals(1, controller.getFlowManager().getRootGroup().getControllerServices(false).size());
}
}
@Test
public void testSynchronizeNewJsonFlow() throws IOException {
final String authFingerprint = authorizer.getFingerprint();
final String flow = getNewJsonFlow();
final DataFlow proposedDataFlow = new StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8),
null,
authFingerprint.getBytes(StandardCharsets.UTF_8),
Collections.emptySet());
// following assertion asserts that VersionedFlowSynchronizer is used
assertFalse(proposedDataFlow.isXml());
controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class), BundleUpdateStrategy.IGNORE_BUNDLE);
// should be an empty dataflow
final Map<String, Integer> componentCounts = controller.getFlowManager().getComponentCounts();
assertEquals(0, componentCounts.get("Processors").intValue());
assertEquals(0, componentCounts.get("Controller Services").intValue());
assertEquals(0, componentCounts.get("Reporting Tasks").intValue());
assertEquals(0, componentCounts.get("Process Groups").intValue());
assertEquals(0, componentCounts.get("Remote Process Groups").intValue());
assertEquals(0, componentCounts.get("Local Input Ports").intValue());
assertEquals(0, componentCounts.get("Local Output Ports").intValue());
assertEquals(0, componentCounts.get("Public Input Ports").intValue());
assertEquals(0, componentCounts.get("Public Output Ports").intValue());
assertNotNull(controller.getFlowManager().getRootGroup());
}
@Test
public void testSynchronizeJsonFlowMissingComponentIds() throws IOException {
final String authFingerprint = authorizer.getFingerprint();
final File jsonFlowFile = new File("src/test/resources/conf/flow-json-missing-component-id.json");
final String flow = IOUtils.toString(new FileInputStream(jsonFlowFile), StandardCharsets.UTF_8);
final DataFlow proposedDataFlow = new StandardDataFlow(flow.getBytes(StandardCharsets.UTF_8),
null,
authFingerprint.getBytes(StandardCharsets.UTF_8),
Collections.emptySet());
// following assertion asserts that VersionedFlowSynchronizer is used
assertFalse(proposedDataFlow.isXml());
controller.synchronize(standardFlowSynchronizer, proposedDataFlow, mock(FlowService.class), BundleUpdateStrategy.IGNORE_BUNDLE);
final Map<String, Integer> componentCounts = controller.getFlowManager().getComponentCounts();
assertEquals(2, componentCounts.get("Processors").intValue());
assertEquals(0, componentCounts.get("Controller Services").intValue());
assertEquals(0, componentCounts.get("Reporting Tasks").intValue());
assertEquals(0, componentCounts.get("Process Groups").intValue());
assertEquals(0, componentCounts.get("Remote Process Groups").intValue());
assertEquals(0, componentCounts.get("Local Input Ports").intValue());
assertEquals(0, componentCounts.get("Local Output Ports").intValue());
assertEquals(0, componentCounts.get("Public Input Ports").intValue());
assertEquals(0, componentCounts.get("Public Output Ports").intValue());
}
private String getNewJsonFlow() throws JsonProcessingException {
final VersionedDataflow versionedDataflow = new VersionedDataflow();
versionedDataflow.setEncodingVersion(new VersionedFlowEncodingVersion(2, 0));
versionedDataflow.setMaxTimerDrivenThreadCount(10);
versionedDataflow.setRegistries(Collections.emptyList());
versionedDataflow.setParameterContexts(Collections.emptyList());
versionedDataflow.setControllerServices(Collections.emptyList());
versionedDataflow.setReportingTasks(Collections.emptyList());
versionedDataflow.setTemplates(Collections.emptySet());
final VersionedProcessGroup rootGroup = new VersionedProcessGroup();
rootGroup.setIdentifier(UUID.randomUUID().toString());
rootGroup.setInstanceIdentifier(UUID.randomUUID().toString());
rootGroup.setName("NiFi Flow");
rootGroup.setComments("");
rootGroup.setPosition(new Position(0, 0));
rootGroup.setProcessGroups(Collections.emptySet());
rootGroup.setRemoteProcessGroups(Collections.emptySet());
rootGroup.setProcessors(Collections.emptySet());
rootGroup.setInputPorts(Collections.emptySet());
rootGroup.setOutputPorts(Collections.emptySet());
rootGroup.setConnections(Collections.emptySet());
rootGroup.setLabels(Collections.emptySet());
rootGroup.setFunnels(Collections.emptySet());
rootGroup.setControllerServices(Collections.emptySet());
rootGroup.setVariables(Collections.emptyMap());
rootGroup.setDefaultFlowFileExpiration("0 sec");
rootGroup.setDefaultBackPressureObjectThreshold(10000L);
rootGroup.setDefaultBackPressureDataSizeThreshold("1 GB");
rootGroup.setFlowFileOutboundPolicy("STREAM_WHEN_AVAILABLE");
rootGroup.setFlowFileConcurrency("UNBOUNDED");
rootGroup.setComponentType(ComponentType.PROCESS_GROUP);
versionedDataflow.setRootGroup(rootGroup);
final ObjectMapper mapper = new ObjectMapper();
final String jsonString = mapper.writeValueAsString(versionedDataflow);
return jsonString;
}
}

View File

@ -0,0 +1,161 @@
{
"encodingVersion": {
"majorVersion": 2,
"minorVersion": 0
},
"maxTimerDrivenThreadCount": 10,
"registries": [],
"parameterContexts": [],
"controllerServices": [],
"reportingTasks": [],
"templates": [],
"rootGroup": {
"identifier": "2a2b649d-8538-3239-9965-536b5b993cc5",
"instanceIdentifier": "13c477b8-0182-1000-31df-454d42e70446",
"name": "NiFi Flow",
"comments": "",
"position": {
"x": 0,
"y": 0
},
"processGroups": [],
"remoteProcessGroups": [],
"processors": [
{
"instanceIdentifier": "182b786c-0182-1000-9677-a42b98ed4f7c",
"name": "GenerateFlowFile",
"comments": "",
"position": {
"x": 356,
"y": 133
},
"type": "org.apache.nifi.processors.standard.GenerateFlowFile",
"bundle": {
"group": "org.apache.nifi",
"artifact": "nifi-standard-nar",
"version": "1.17.0-SNAPSHOT"
},
"properties": {
"character-set": "UTF-8",
"File Size": "0B",
"generate-ff-custom-text": "test",
"Batch Size": "1",
"Unique FlowFiles": "false",
"Data Format": "Text"
},
"propertyDescriptors": {},
"style": {},
"schedulingPeriod": "0 sec",
"schedulingStrategy": "TIMER_DRIVEN",
"executionNode": "ALL",
"penaltyDuration": "30 sec",
"yieldDuration": "1 sec",
"bulletinLevel": "WARN",
"runDurationMillis": 0,
"concurrentlySchedulableTaskCount": 1,
"autoTerminatedRelationships": [],
"scheduledState": "ENABLED",
"retryCount": 10,
"retriedRelationships": [],
"backoffMechanism": "PENALIZE_FLOWFILE",
"maxBackoffPeriod": "10 mins",
"componentType": "PROCESSOR",
"groupIdentifier": "2a2b649d-8538-3239-9965-536b5b993cc5"
},
{
"identifier": "eb393564-ce84-3777-a8a8-f923dd912e2f",
"instanceIdentifier": "182bd303-0182-1000-d9ab-a31944d6b6f6",
"name": "LogAttribute",
"comments": "",
"position": {
"x": 360,
"y": 352
},
"type": "org.apache.nifi.processors.standard.LogAttribute",
"bundle": {
"group": "org.apache.nifi",
"artifact": "nifi-standard-nar",
"version": "1.17.0-SNAPSHOT"
},
"properties": {
"character-set": "UTF-8",
"Log FlowFile Properties": "true",
"Log Level": "info",
"attributes-to-log-regex": ".*",
"Output Format": "Line per Attribute",
"Log Payload": "false"
},
"propertyDescriptors": {},
"style": {},
"schedulingPeriod": "0 sec",
"schedulingStrategy": "TIMER_DRIVEN",
"executionNode": "ALL",
"penaltyDuration": "30 sec",
"yieldDuration": "1 sec",
"bulletinLevel": "WARN",
"runDurationMillis": 0,
"concurrentlySchedulableTaskCount": 1,
"autoTerminatedRelationships": [
"success"
],
"scheduledState": "ENABLED",
"retryCount": 10,
"retriedRelationships": [],
"backoffMechanism": "PENALIZE_FLOWFILE",
"maxBackoffPeriod": "10 mins",
"componentType": "PROCESSOR",
"groupIdentifier": "2a2b649d-8538-3239-9965-536b5b993cc5"
}
],
"inputPorts": [],
"outputPorts": [],
"connections": [
{
"identifier": "64a51da4-0263-3d40-ab64-055bee2a8856",
"instanceIdentifier": "182c0ec9-0182-1000-942b-c928cf2c52f2",
"name": "",
"source": {
"id": "365da014-6f15-3dbd-a8a0-3431923790ba",
"type": "PROCESSOR",
"groupId": "2a2b649d-8538-3239-9965-536b5b993cc5",
"name": "GenerateFlowFile",
"comments": "",
"instanceIdentifier": "182b786c-0182-1000-9677-a42b98ed4f7c"
},
"destination": {
"id": "eb393564-ce84-3777-a8a8-f923dd912e2f",
"type": "PROCESSOR",
"groupId": "2a2b649d-8538-3239-9965-536b5b993cc5",
"name": "LogAttribute",
"comments": "",
"instanceIdentifier": "182bd303-0182-1000-d9ab-a31944d6b6f6"
},
"labelIndex": 1,
"zIndex": 0,
"selectedRelationships": [
"success"
],
"backPressureObjectThreshold": 10000,
"backPressureDataSizeThreshold": "1 GB",
"flowFileExpiration": "0 sec",
"prioritizers": [],
"bends": [],
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"partitioningAttribute": "",
"loadBalanceCompression": "DO_NOT_COMPRESS",
"componentType": "CONNECTION",
"groupIdentifier": "2a2b649d-8538-3239-9965-536b5b993cc5"
}
],
"labels": [],
"funnels": [],
"controllerServices": [],
"variables": {},
"defaultFlowFileExpiration": "0 sec",
"defaultBackPressureObjectThreshold": 10000,
"defaultBackPressureDataSizeThreshold": "1 GB",
"flowFileConcurrency": "UNBOUNDED",
"flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
"componentType": "PROCESS_GROUP"
}
}

View File

@ -89,8 +89,8 @@ public class StandardFlowDifference implements FlowDifference {
@Override
public int hashCode() {
return 31 + 17 * (componentA == null ? 0 : componentA.getIdentifier().hashCode()) +
17 * (componentB == null ? 0 : componentB.getIdentifier().hashCode()) +
return 31 + 17 * (componentA == null ? 0 : Objects.hashCode(componentA.getIdentifier())) +
17 * (componentB == null ? 0 : Objects.hashCode(componentB.getIdentifier())) +
15 * (componentA == null ? 0 : Objects.hash(componentA.getInstanceIdentifier())) +
15 * (componentB == null ? 0 : Objects.hash(componentB.getInstanceIdentifier())) +
Objects.hash(description, type, valueA, valueB);