mirror of https://github.com/apache/nifi.git
NIFI-11737: Improved performance of FlowSynchronizationIT
- FlowSynchronizationIT no longer requires isDestroyEnvironmentAfterEachTest to return true This closes #7420 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
3ddac48c6d
commit
9709bd6fb7
|
@ -96,16 +96,11 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isDestroyEnvironmentAfterEachTest() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testParameterUpdateWhileNodeDisconnected() throws NiFiClientException, IOException, InterruptedException {
|
||||
// Add Parameter context with Param1 = 1
|
||||
final ParameterContextEntity parameterContextEntity = getClientUtil().createParameterContext("Context1", Collections.singletonMap("Param1", "1"));
|
||||
final ParameterContextEntity parameterContextEntity = getClientUtil().createParameterContext("testParameterUpdateWhileNodeDisconnected", Collections.singletonMap("Param1", "1"));
|
||||
getClientUtil().setParameterContext("root", parameterContextEntity);
|
||||
|
||||
// Create a GenerateFlowFile that adds an attribute with name 'attr' and a value that references the parameter
|
||||
|
@ -166,7 +161,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
|
|||
final ProcessorEntity countEvents3 = getClientUtil().createProcessor("CountEvents");
|
||||
|
||||
// Create parameter context with a sensitive parameter and set that on the root group
|
||||
final ParameterContextEntity paramContext = getClientUtil().createParameterContext("context1", "MyParameter", "Our Secret", true);
|
||||
final ParameterContextEntity paramContext = getClientUtil().createParameterContext("testSensitivePropertiesInherited", "MyParameter", "Our Secret", true);
|
||||
getClientUtil().setParameterContext("root", paramContext);
|
||||
|
||||
// Set sensitive property of 1 processor to an explicit value and sensitive property of another to a sensitive parameter.
|
||||
|
@ -202,13 +197,14 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
|
|||
|
||||
@Test
|
||||
public void testComponentsRecreatedOnRejoinCluster() throws NiFiClientException, IOException, InterruptedException {
|
||||
final ProcessGroupEntity topLevel = getClientUtil().createProcessGroup("testComponentsRecreatedOnRejoinCluster", "root");
|
||||
// Build dataflow with processors at root level and an inner group that contains an input port, output port, and a processor, as well as a Controller Service that the processor will use.
|
||||
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
|
||||
final ProcessGroupEntity group = getClientUtil().createProcessGroup("Inner Group", "root");
|
||||
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile", topLevel.getId());
|
||||
final ProcessGroupEntity group = getClientUtil().createProcessGroup("Inner Group", topLevel.getId());
|
||||
final PortEntity inPort = getClientUtil().createInputPort("In", group.getId());
|
||||
final PortEntity outPort = getClientUtil().createOutputPort("Out", group.getId());
|
||||
final ProcessorEntity count = getClientUtil().createProcessor("CountFlowFiles", group.getId());
|
||||
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
|
||||
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", topLevel.getId());
|
||||
getClientUtil().updateProcessorSchedulingPeriod(generate, "60 sec");
|
||||
|
||||
final ControllerServiceEntity countService = getClientUtil().createControllerService("StandardCountService", group.getId());
|
||||
|
@ -230,7 +226,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
|
|||
reportingTaskProperties.put("Text", "${now():toNumber()}");
|
||||
getClientUtil().updateReportingTaskProperties(reportingTask, reportingTaskProperties);
|
||||
|
||||
final ParameterContextEntity context = getClientUtil().createParameterContext("Context1", "abc", "hello", false);
|
||||
final ParameterContextEntity context = getClientUtil().createParameterContext("testComponentsRecreatedOnRejoinCluster", "abc", "hello", false);
|
||||
|
||||
// Disconnect Node 2
|
||||
disconnectNode(2);
|
||||
|
@ -260,7 +256,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
|
|||
// is on Node 2.
|
||||
switchClientToNode(2);
|
||||
|
||||
final ProcessGroupFlowEntity flow = getNifiClient().getFlowClient(DO_NOT_REPLICATE).getProcessGroup("root");
|
||||
final ProcessGroupFlowEntity flow = getNifiClient().getFlowClient(DO_NOT_REPLICATE).getProcessGroup(topLevel.getId());
|
||||
final FlowDTO flowDto = flow.getProcessGroupFlow().getFlow();
|
||||
assertEquals(2, flowDto.getConnections().size());
|
||||
assertEquals(2, flowDto.getProcessors().size());
|
||||
|
@ -395,11 +391,12 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
|
|||
// Wait for node to show as disconnected because it doesn't have the necessary nar
|
||||
waitForNodeState(2, NodeConnectionState.DISCONNECTED);
|
||||
|
||||
// We need to either restart Node 2 or remove it from the cluster in order to ensure that we can properly shutdown.
|
||||
// Reconnecting to the cluster would require restoring the NAR file and restarting, which will take longer than simply removing the
|
||||
// node from the cluster. So we opt for shutting down the node and removing it from the cluster.
|
||||
// We need to restore the extensions nar and restart the node so that subsequent tests can succeed
|
||||
restoreExtensionsNar(node2);
|
||||
node2.stop();
|
||||
removeNode(2);
|
||||
node2.start();
|
||||
|
||||
waitForAllNodesConnected();
|
||||
}
|
||||
|
||||
private void removeNode(final int index) throws NiFiClientException, IOException, InterruptedException {
|
||||
|
@ -437,6 +434,17 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
|
|||
waitForAllNodesConnected();
|
||||
|
||||
assertTrue(getNifiClient().getProcessorClient().getProcessor(generate.getId()).getComponent().getExtensionMissing());
|
||||
|
||||
// In order to ensure that subsequent tests are able to operate properly, we need to restore the nar and restart
|
||||
node1.stop();
|
||||
node2.stop();
|
||||
|
||||
restoreExtensionsNar(node1);
|
||||
restoreExtensionsNar(node2);
|
||||
|
||||
node1.start(false);
|
||||
node2.start(true);
|
||||
waitForAllNodesConnected();
|
||||
}
|
||||
|
||||
|
||||
|
@ -511,6 +519,12 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
|
|||
assertTrue(extensionsNar.renameTo(backupFile));
|
||||
}
|
||||
|
||||
private void restoreExtensionsNar(final NiFiInstance nifiInstance) {
|
||||
final File backupFile = getExtensionsNar(nifiInstance);
|
||||
final File extensionsNar = new File(backupFile.getParentFile(), backupFile.getName().replace(".backup", ""));
|
||||
assertTrue(backupFile.renameTo(extensionsNar));
|
||||
}
|
||||
|
||||
private File getExtensionsNar(final NiFiInstance nifiInstance) {
|
||||
final File libDir = new File(nifiInstance.getInstanceDirectory(), "lib");
|
||||
final File[] testExtensionsNar = libDir.listFiles(file -> file.getName().startsWith("nifi-system-test-extensions-nar-"));
|
||||
|
|
Loading…
Reference in New Issue