NIFI-4310 - added changes to support detection of reporting tasks and controller services during isEmpty flow check. Added testing scenarios. This closes #2107.

This commit is contained in:
Yolanda M. Davis 2017-08-23 01:53:49 -04:00 committed by Mark Payne
parent 6c426f7a12
commit 0eda71a9a6
5 changed files with 209 additions and 2 deletions

View File

@ -22,6 +22,9 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.file.Files;
@ -33,7 +36,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.fingerprint.FingerprintFactory;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@ -134,6 +139,71 @@ public class TestPopularVoteFlowElection {
}
}
@Test
public void testAutoGeneratedVsPopulatedFlowElection() throws IOException {
final FingerprintFactory fingerprintFactory = new FingerprintFactory(StringEncryptor.createEncryptor(getNiFiProperties()));
final PopularVoteFlowElection election = new PopularVoteFlowElection(1, TimeUnit.MINUTES, 4, fingerprintFactory);
final byte[] emptyFlow = Files.readAllBytes(Paths.get("src/test/resources/conf/auto-generated-empty-flow.xml"));
final byte[] nonEmptyFlow = Files.readAllBytes(Paths.get("src/test/resources/conf/reporting-task-flow.xml"));
for (int i = 0; i < 4; i++) {
assertFalse(election.isElectionComplete());
assertNull(election.getElectedDataFlow());
final DataFlow dataFlow;
if (i % 2 == 0) {
dataFlow = createDataFlow(emptyFlow);
} else {
dataFlow = createDataFlow(nonEmptyFlow);
}
final DataFlow electedDataFlow = election.castVote(dataFlow, createNodeId(i));
if (i == 3) {
assertNotNull(electedDataFlow);
assertEquals(new String(nonEmptyFlow), new String(electedDataFlow.getFlow()));
} else {
assertNull(electedDataFlow);
}
}
}
@Test
public void testDifferentPopulatedFlowsElection() throws IOException {
final FingerprintFactory fingerprintFactory = new FingerprintFactory(StringEncryptor.createEncryptor(getNiFiProperties()));
final PopularVoteFlowElection election = new PopularVoteFlowElection(1, TimeUnit.MINUTES, 4, fingerprintFactory);
final byte[] nonEmptyCandidateA = Files.readAllBytes(Paths.get("src/test/resources/conf/controller-service-flow.xml"));
final byte[] nonEmptyCandidateB = Files.readAllBytes(Paths.get("src/test/resources/conf/reporting-task-flow.xml"));
for (int i = 0; i < 4; i++) {
assertFalse(election.isElectionComplete());
assertNull(election.getElectedDataFlow());
final DataFlow dataFlow;
if (i % 2 == 0) {
dataFlow = createDataFlow(nonEmptyCandidateA);
} else {
dataFlow = createDataFlow(nonEmptyCandidateB);
}
final DataFlow electedDataFlow = election.castVote(dataFlow, createNodeId(i));
if (i == 3) {
assertNotNull(electedDataFlow);
assertEquals(new String(nonEmptyCandidateA), new String(electedDataFlow.getFlow()));
} else {
assertNull(electedDataFlow);
}
}
}
private NiFiProperties getNiFiProperties() {
final NiFiProperties nifiProperties = mock(NiFiProperties.class);
when(nifiProperties.getProperty(StringEncryptor.NF_SENSITIVE_PROPS_ALGORITHM)).thenReturn("PBEWITHMD5AND256BITAES-CBC-OPENSSL");
when(nifiProperties.getProperty(StringEncryptor.NF_SENSITIVE_PROPS_PROVIDER)).thenReturn("BC");
when(nifiProperties.getProperty(anyString(), anyString())).then(invocation -> invocation.getArgumentAt(1, String.class));
return nifiProperties;
}
private NodeIdentifier createNodeId(final int index) {
return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9000 + index, "localhost", 9000 + index, "localhost", 9000 + index, 9000 + index, true);

View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<flowController encoding-version="1.1">
<maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
<maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
<rootGroup>
<id>ee207cce-015d-1000-30bf-36cd2fd1ea5c</id>
<name>NiFi Flow</name>
<position x="0.0" y="0.0"/>
<comment/>
</rootGroup>
<controllerServices/>
<reportingTasks/>
</flowController>

View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<flowController>
<maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
<maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
<rootGroup>
<id>778f676e-6542-4c18-9d06-24b6fd3a1b29</id>
<name>NiFi Flow</name>
<position x="0.0" y="0.0"/>
</rootGroup>
<controllerServices>
<controllerService>
<id>edf22ee5-376a-46dc-a38a-919351124457</id>
<name>ControllerService</name>
<comment/>
<class>org.apache.nifi.controller.service.mock.ServiceD</class>
<enabled>false</enabled>
<property>
<name>Foo1</name>
<value>Bar1</value>
</property>
</controllerService>
</controllerServices>
</flowController>

View File

@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<flowController>
<maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
<maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
<rootGroup>
<id>7c84501d-d10c-407c-b9f3-1d80e38fe36a</id>
<name>NiFi Flow</name>
<position x="0.0" y="0.0"/>
<comment/>
</rootGroup>
<controllerServices/>
<reportingTasks>
<reportingTask>
<id>3b80ba0f-a6c0-48db-b721-4dbc04cef28e</id>
<name>AmbariReportingTask</name>
<comment/>
<class>org.apache.nifi.reporting.ambari.AmbariReportingTask</class>
<bundle>
<group>org.apache.nifi</group>
<artifact>nifi-standard-nar</artifact>
<version>1.1.0</version>
</bundle>
<schedulingPeriod>{{nifi_ambari_reporting_frequency}}</schedulingPeriod>
<scheduledState>RUNNING</scheduledState>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<property>
<name>Metrics Collector URL</name>
<value>${ambari.metrics.collector.url}</value>
</property>
<property>
<name>Application ID</name>
<value>${ambari.application.id}</value>
</property>
<property>
<name>Hostname</name>
<value>${hostname(true)}</value>
</property>
</reportingTask>
</reportingTasks>
</flowController>

View File

@ -101,6 +101,7 @@ import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.FunnelDTO;
import org.apache.nifi.web.api.dto.LabelDTO;
@ -146,9 +147,15 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootGroupElement);
final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, null, encodingVersion);
return isEmpty(rootGroupDto);
final NodeList reportingTasks = rootElement.getElementsByTagName("reportingTask");
final ReportingTaskDTO reportingTaskDTO = reportingTasks.getLength() == 0 ? null : FlowFromDOMFactory.getReportingTask((Element)reportingTasks.item(0),null);
final NodeList controllerServices = rootElement.getElementsByTagName("controllerService");
final ControllerServiceDTO controllerServiceDTO = controllerServices.getLength() == 0 ? null : FlowFromDOMFactory.getControllerService((Element)controllerServices.item(0),null);
return isEmpty(rootGroupDto) && isEmpty(reportingTaskDTO) && isEmpty(controllerServiceDTO);
}
@Override
@ -537,6 +544,18 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
&& CollectionUtils.isEmpty(contents.getRemoteProcessGroups());
}
private static boolean isEmpty(final ReportingTaskDTO reportingTaskDTO){
return reportingTaskDTO == null || StringUtils.isEmpty(reportingTaskDTO.getName()) ;
}
private static boolean isEmpty(final ControllerServiceDTO controllerServiceDTO){
return controllerServiceDTO == null || StringUtils.isEmpty(controllerServiceDTO.getName());
}
private static Document parseFlowBytes(final byte[] flow) throws FlowSerializationException {
// create document by parsing proposed flow bytes
try {