diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java index 6447e27d05..95e9f5f325 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/flow/TestPopularVoteFlowElection.java @@ -22,6 +22,9 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.nio.file.Files; @@ -33,7 +36,9 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.StandardDataFlow; +import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.fingerprint.FingerprintFactory; +import org.apache.nifi.util.NiFiProperties; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -134,6 +139,71 @@ public class TestPopularVoteFlowElection { } } + @Test + public void testAutoGeneratedVsPopulatedFlowElection() throws IOException { + final FingerprintFactory fingerprintFactory = new FingerprintFactory(StringEncryptor.createEncryptor(getNiFiProperties())); + final PopularVoteFlowElection election = new PopularVoteFlowElection(1, TimeUnit.MINUTES, 4, fingerprintFactory); + final byte[] emptyFlow = Files.readAllBytes(Paths.get("src/test/resources/conf/auto-generated-empty-flow.xml")); + final byte[] nonEmptyFlow = Files.readAllBytes(Paths.get("src/test/resources/conf/reporting-task-flow.xml")); + + for (int i = 0; i < 4; i++) { + assertFalse(election.isElectionComplete()); + assertNull(election.getElectedDataFlow()); + + final DataFlow dataFlow; + if (i % 2 == 0) { + dataFlow = createDataFlow(emptyFlow); + } else { + dataFlow = createDataFlow(nonEmptyFlow); + } + + final DataFlow electedDataFlow = election.castVote(dataFlow, createNodeId(i)); + + if (i == 3) { + assertNotNull(electedDataFlow); + assertEquals(new String(nonEmptyFlow), new String(electedDataFlow.getFlow())); + } else { + assertNull(electedDataFlow); + } + } + } + + @Test + public void testDifferentPopulatedFlowsElection() throws IOException { + final FingerprintFactory fingerprintFactory = new FingerprintFactory(StringEncryptor.createEncryptor(getNiFiProperties())); + final PopularVoteFlowElection election = new PopularVoteFlowElection(1, TimeUnit.MINUTES, 4, fingerprintFactory); + final byte[] nonEmptyCandidateA = Files.readAllBytes(Paths.get("src/test/resources/conf/controller-service-flow.xml")); + final byte[] nonEmptyCandidateB = Files.readAllBytes(Paths.get("src/test/resources/conf/reporting-task-flow.xml")); + + for (int i = 0; i < 4; i++) { + assertFalse(election.isElectionComplete()); + assertNull(election.getElectedDataFlow()); + + final DataFlow dataFlow; + if (i % 2 == 0) { + dataFlow = createDataFlow(nonEmptyCandidateA); + } else { + dataFlow = createDataFlow(nonEmptyCandidateB); + } + + final DataFlow electedDataFlow = election.castVote(dataFlow, createNodeId(i)); + + if (i == 3) { + assertNotNull(electedDataFlow); + assertEquals(new String(nonEmptyCandidateA), new String(electedDataFlow.getFlow())); + } else { + assertNull(electedDataFlow); + } + } + } + + private NiFiProperties getNiFiProperties() { + final NiFiProperties nifiProperties = mock(NiFiProperties.class); + when(nifiProperties.getProperty(StringEncryptor.NF_SENSITIVE_PROPS_ALGORITHM)).thenReturn("PBEWITHMD5AND256BITAES-CBC-OPENSSL"); + when(nifiProperties.getProperty(StringEncryptor.NF_SENSITIVE_PROPS_PROVIDER)).thenReturn("BC"); + when(nifiProperties.getProperty(anyString(), anyString())).then(invocation -> invocation.getArgumentAt(1, String.class)); + return nifiProperties; + } private NodeIdentifier createNodeId(final int index) { return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, 9000 + index, true); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/auto-generated-empty-flow.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/auto-generated-empty-flow.xml new file mode 100644 index 0000000000..720fc0b4d9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/auto-generated-empty-flow.xml @@ -0,0 +1,27 @@ + + + + 10 + 5 + + ee207cce-015d-1000-30bf-36cd2fd1ea5c + NiFi Flow + + + + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/controller-service-flow.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/controller-service-flow.xml new file mode 100644 index 0000000000..4278311558 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/controller-service-flow.xml @@ -0,0 +1,37 @@ + + + + 10 + 5 + + 778f676e-6542-4c18-9d06-24b6fd3a1b29 + NiFi Flow + + + + + edf22ee5-376a-46dc-a38a-919351124457 + ControllerService + + org.apache.nifi.controller.service.mock.ServiceD + false + + Foo1 + Bar1 + + + + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/reporting-task-flow.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/reporting-task-flow.xml new file mode 100644 index 0000000000..751517c33d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/resources/conf/reporting-task-flow.xml @@ -0,0 +1,54 @@ + + + + 10 + 5 + + 7c84501d-d10c-407c-b9f3-1d80e38fe36a + NiFi Flow + + + + + + + 3b80ba0f-a6c0-48db-b721-4dbc04cef28e + AmbariReportingTask + + org.apache.nifi.reporting.ambari.AmbariReportingTask + + org.apache.nifi + nifi-standard-nar + 1.1.0 + + {{nifi_ambari_reporting_frequency}} + RUNNING + TIMER_DRIVEN + + Metrics Collector URL + ${ambari.metrics.collector.url} + + + Application ID + ${ambari.application.id} + + + Hostname + ${hostname(true)} + + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 58bb90fa51..3af270cc50 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -101,6 +101,7 @@ import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.web.api.dto.BundleDTO; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.FunnelDTO; import org.apache.nifi.web.api.dto.LabelDTO; @@ -146,9 +147,15 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootGroupElement); - final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, null, encodingVersion); - return isEmpty(rootGroupDto); + + final NodeList reportingTasks = rootElement.getElementsByTagName("reportingTask"); + final ReportingTaskDTO reportingTaskDTO = reportingTasks.getLength() == 0 ? null : FlowFromDOMFactory.getReportingTask((Element)reportingTasks.item(0),null); + + final NodeList controllerServices = rootElement.getElementsByTagName("controllerService"); + final ControllerServiceDTO controllerServiceDTO = controllerServices.getLength() == 0 ? null : FlowFromDOMFactory.getControllerService((Element)controllerServices.item(0),null); + + return isEmpty(rootGroupDto) && isEmpty(reportingTaskDTO) && isEmpty(controllerServiceDTO); } @Override @@ -537,6 +544,18 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { && CollectionUtils.isEmpty(contents.getRemoteProcessGroups()); } + private static boolean isEmpty(final ReportingTaskDTO reportingTaskDTO){ + + return reportingTaskDTO == null || StringUtils.isEmpty(reportingTaskDTO.getName()) ; + + } + + private static boolean isEmpty(final ControllerServiceDTO controllerServiceDTO){ + + return controllerServiceDTO == null || StringUtils.isEmpty(controllerServiceDTO.getName()); + + } + private static Document parseFlowBytes(final byte[] flow) throws FlowSerializationException { // create document by parsing proposed flow bytes try {