diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index 38e8a83fce..7276a53ace 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -101,6 +101,7 @@ import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -129,9 +130,11 @@ public class NiFiClientUtil { return client; } - public void startProcessor(final ProcessorEntity currentEntity) throws NiFiClientException, IOException { + public ProcessorEntity startProcessor(final ProcessorEntity currentEntity) throws NiFiClientException, IOException, InterruptedException { + waitForValidationCompleted(currentEntity); + currentEntity.setDisconnectedNodeAcknowledged(true); - getProcessorClient().startProcessor(currentEntity); + return getProcessorClient().startProcessor(currentEntity); } public void stopProcessor(final ProcessorEntity currentEntity) throws NiFiClientException, IOException, InterruptedException { @@ -475,6 +478,19 @@ public class NiFiClientUtil { } } + public void waitForValidationCompleted(final ProcessorEntity processorEntity) throws NiFiClientException, IOException, InterruptedException { + String validationStatus; + do { + final ProcessorEntity currentEntity = getProcessorClient().getProcessor(processorEntity.getId()); + validationStatus = currentEntity.getComponent().getValidationStatus(); + + if (validationStatus.equals(ProcessorDTO.VALIDATING)) { + logger.debug("Waiting for Processor {} to finish validating...", processorEntity.getId()); + Thread.sleep(100L); + } + } while (Objects.equals(validationStatus, ProcessorDTO.VALIDATING)); + } + public void waitForRunningProcessor(final String processorId) throws InterruptedException, IOException, NiFiClientException { waitForProcessorState(processorId, "RUNNING"); } @@ -606,7 +622,9 @@ public class NiFiClientUtil { return counterValues; } - public ScheduleComponentsEntity startProcessGroupComponents(final String groupId) throws NiFiClientException, IOException { + public ScheduleComponentsEntity startProcessGroupComponents(final String groupId) throws NiFiClientException, IOException, InterruptedException { + waitForAllProcessorValidationToComplete(groupId); + final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity(); scheduleComponentsEntity.setId(groupId); scheduleComponentsEntity.setState("RUNNING"); @@ -616,6 +634,30 @@ public class NiFiClientUtil { return scheduleEntity; } + private void waitForAllProcessorValidationToComplete(final String groupId) throws NiFiClientException, IOException, InterruptedException { + final Set processors = findAllProcessors(groupId); + + for (final ProcessorEntity processor : processors) { + waitForValidationCompleted(processor); + } + } + + private Set findAllProcessors(final String groupId) throws NiFiClientException, IOException { + final Set processors = new HashSet<>(); + findAllProcessors(groupId, processors); + return processors; + } + + private void findAllProcessors(final String groupId, final Set allProcessors) throws NiFiClientException, IOException { + final ProcessGroupFlowEntity flowEntity = nifiClient.getFlowClient().getProcessGroup(groupId); + final FlowDTO flowDto = flowEntity.getProcessGroupFlow().getFlow(); + allProcessors.addAll(flowDto.getProcessors()); + + for (final ProcessGroupEntity childGroup : flowDto.getProcessGroups()) { + findAllProcessors(childGroup.getId(), allProcessors); + } + } + public ScheduleComponentsEntity stopProcessGroupComponents(final String groupId) throws NiFiClientException, IOException { final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity(); scheduleComponentsEntity.setId(groupId); @@ -722,13 +764,13 @@ public class NiFiClientUtil { public void waitForControllerServiceRunStatus(final String id, final String requestedRunStatus) throws NiFiClientException, IOException { while (true) { final ControllerServiceEntity serviceEntity = nifiClient.getControllerServicesClient().getControllerService(id); - final String runStatus = serviceEntity.getStatus().getRunStatus(); - if (requestedRunStatus.equals(runStatus)) { - logger.info("Controller Service [{}] run status [{}] found", id, runStatus); + final String serviceState = serviceEntity.getComponent().getState(); + if (requestedRunStatus.equals(serviceState)) { + logger.info("Controller Service [{}] run status [{}] found", id, serviceState); break; } - logger.info("Controller Service [{}] run status [{}] not matched [{}]: sleeping before retrying", id, runStatus, requestedRunStatus); + logger.info("Controller Service [{}] run status [{}] not matched [{}]: sleeping before retrying", id, serviceState, requestedRunStatus); try { Thread.sleep(500L); @@ -773,7 +815,7 @@ public class NiFiClientUtil { public void waitForControllerServiceValidationStatus(final String controllerServiceId, final String validationStatus) throws NiFiClientException, IOException { while (true) { final ControllerServiceEntity controllerServiceEntity = nifiClient.getControllerServicesClient().getControllerService(controllerServiceId); - final String currentValidationStatus = controllerServiceEntity.getStatus().getValidationStatus(); + final String currentValidationStatus = controllerServiceEntity.getComponent().getValidationStatus(); if (validationStatus.equals(currentValidationStatus)) { logger.info("Controller Service ID [{}] Type [{}] Validation Status [{}] matched", controllerServiceId, controllerServiceEntity.getComponent().getType(), validationStatus); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java index ba8a606a6f..1ead90a772 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java @@ -733,7 +733,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT { getClientUtil().enableControllerService(countB); getClientUtil().enableControllerService(countA); - getNifiClient().getProcessorClient().startProcessor(countFlowFiles); + getClientUtil().startProcessor(countFlowFiles); // Disconnect Node 2. Switch client to direct requests to Node 2 so that we can update the node while it's disconnected. disconnectNode(2); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterAdjustStateIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterAdjustStateIT.java index 6c89de2afb..5811e7b8b9 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterAdjustStateIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterAdjustStateIT.java @@ -70,7 +70,7 @@ public class JoinClusterAdjustStateIT extends NiFiSystemIT { // Start the Processor that requires a file named "monitored" to exist. When we join Node 2 to the cluster, this directory will not exist. // We want to ensure that the Processor does in fact start on its own when the directory is created. - getNifiClient().getProcessorClient().startProcessor(fileProcessor.getId(), fileProcessor.getRevision().getClientId(), 1); + getClientUtil().startProcessor(fileProcessor); getClientUtil().waitForRunningProcessor(fileProcessor.getId()); // Create a new NiFi instance @@ -84,7 +84,7 @@ public class JoinClusterAdjustStateIT extends NiFiSystemIT { // Start the Count Processor on Node 1. When Node 2 joins the cluster, we know that its flow will indicate that the Processor is stopped. // But because the cluster indicates that the Processor is running, the second node should inherit this value and immediately start the Processor also. - getNifiClient().getProcessorClient().startProcessor(countProcessor.getId(), countProcessor.getRevision().getClientId(), 1); + getClientUtil().startProcessor(countProcessor); node2Instance.start(); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithMissingConnectionWithData.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithMissingConnectionWithData.java index d5fcde5625..8694d14fcf 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithMissingConnectionWithData.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithMissingConnectionWithData.java @@ -64,7 +64,7 @@ public class JoinClusterWithMissingConnectionWithData extends NiFiSystemIT { ProcessorEntity generate = getNifiClient().getProcessorClient().getProcessor(GENERATE_UUID); // Start Generate Processor - generate = getNifiClient().getProcessorClient().startProcessor(generate); + generate = getClientUtil().startProcessor(generate); // Wait for data to be queued up, one FlowFile for each node. waitFor(this::isDataQueued); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java index 5e27f24737..d680fc7389 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java @@ -93,7 +93,7 @@ public class LoadBalanceIT extends NiFiSystemIT { getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.ROUND_ROBIN, compression, null); // Generate the data. - getNifiClient().getProcessorClient().startProcessor(generate); + getClientUtil().startProcessor(generate); // Wait until all 20 FlowFiles are queued up. waitFor(() -> { @@ -131,7 +131,7 @@ public class LoadBalanceIT extends NiFiSystemIT { getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.SINGLE_NODE, LoadBalanceCompression.DO_NOT_COMPRESS, null); // Generate the data. - getNifiClient().getProcessorClient().startProcessor(generate); + getClientUtil().startProcessor(generate); // Wait until all 20 FlowFiles are queued up. waitFor(() -> { @@ -187,7 +187,7 @@ public class LoadBalanceIT extends NiFiSystemIT { // Queue 100 FlowFiles. 10 with number=0, 10 with number=1, 10 with number=2, etc. to up 10 with number=9 for (int i=1; i <= 10; i++) { // Generate the data. - getNifiClient().getProcessorClient().startProcessor(generate); + getClientUtil().startProcessor(generate); final int expectedQueueSize = 10 * i; @@ -238,7 +238,7 @@ public class LoadBalanceIT extends NiFiSystemIT { getClientUtil().updateProcessorProperties(generate, generateProperties); // Generate the data. - getNifiClient().getProcessorClient().startProcessor(generate); + getClientUtil().startProcessor(generate); // Wait until all 20 FlowFiles are queued up. waitFor(() -> { @@ -338,7 +338,7 @@ public class LoadBalanceIT extends NiFiSystemIT { getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.ROUND_ROBIN, LoadBalanceCompression.DO_NOT_COMPRESS, null); // Generate the data. - getNifiClient().getProcessorClient().startProcessor(generate); + getClientUtil().startProcessor(generate); // Wait until all 20 FlowFiles are queued up. waitFor(() -> { @@ -380,7 +380,7 @@ public class LoadBalanceIT extends NiFiSystemIT { // times out. while (true) { // Generate the data. - getNifiClient().getProcessorClient().startProcessor(generate); + getClientUtil().startProcessor(generate); // Wait until all 20 FlowFiles are queued up waitFor(() -> { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java index 9526d62540..c9897be52b 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java @@ -372,7 +372,7 @@ public class ParameterContextIT extends NiFiSystemIT { waitForValidProcessor(processorId); // Start Processors - getNifiClient().getProcessorClient().startProcessor(processorId, processorEntity.getRevision().getClientId(), 1); + getClientUtil().startProcessor(processorEntity); try { // Update Parameter Context to a long validation time. @@ -388,9 +388,7 @@ public class ParameterContextIT extends NiFiSystemIT { waitForRunningProcessor(processorId); } finally { // Ensure that we stop the processor so that other tests are allowed to change the Parameter Context, etc. - getNifiClient().getProcessorClient().stopProcessor(processorId, processorEntity.getRevision().getClientId(), 2); - waitForStoppedProcessor(processorId); - + getClientUtil().stopProcessor(processorEntity); getNifiClient().getProcessorClient().deleteProcessor(processorId, processorEntity.getRevision().getClientId(), 3); } } @@ -419,7 +417,7 @@ public class ParameterContextIT extends NiFiSystemIT { processorEntity.getComponent().getConfig().setAutoTerminatedRelationships(Collections.singleton("success")); getNifiClient().getProcessorClient().updateProcessor(processorEntity); - getNifiClient().getProcessorClient().startProcessor(processorId, processorEntity.getRevision().getClientId(), 1L); + getClientUtil().startProcessor(processorEntity); try { final ParameterContextUpdateRequestEntity requestEntity = updateParameterContext(createdContextEntity, "sleep", "6 secs"); @@ -435,8 +433,7 @@ public class ParameterContextIT extends NiFiSystemIT { waitForRunningProcessor(processorId); } finally { - getNifiClient().getProcessorClient().stopProcessor(processorId, processorEntity.getRevision().getClientId(), 1L); - waitForStoppedProcessor(processorId); + getClientUtil().stopProcessor(processorEntity); getNifiClient().getProcessorClient().deleteProcessor(processorId, processorEntity.getRevision().getClientId(), 3); } } finally { @@ -532,7 +529,7 @@ public class ParameterContextIT extends NiFiSystemIT { getClientUtil().setAutoTerminatedRelationships(splitByLine, Collections.singleton("success")); getClientUtil().createConnection(generate, splitByLine, "success"); - getNifiClient().getProcessorClient().startProcessor(splitByLine); + getClientUtil().startProcessor(splitByLine); // Change parameter to an invalid value. This will result in the processor being stopped, becoming invalid, and then being transitioned to a 'starting' state while invalid. final ParameterContextUpdateRequestEntity updateToInvalidRequestEntity = updateParameterContext(contextEntity, "clone", "invalid"); @@ -575,8 +572,8 @@ public class ParameterContextIT extends NiFiSystemIT { final ProcessorEntity secondProcessorEntity = createProcessor(TEST_PROCESSORS_PACKAGE + ".CountEvents", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion()); // Start Processors - getNifiClient().getProcessorClient().startProcessor(processorEntity.getId(), processorEntity.getRevision().getClientId(), 1L); - getNifiClient().getProcessorClient().startProcessor(secondProcessorEntity.getId(), secondProcessorEntity.getRevision().getClientId(), 1L); + getClientUtil().startProcessor(processorEntity); + getClientUtil().startProcessor(secondProcessorEntity); Map counterValues = waitForCounter(processorEntity.getId(), "Scheduled", getNumberOfNodes()); assertFalse(counterValues.containsKey("Stopped")); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/BatchFlowBetweenGroupsIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/BatchFlowBetweenGroupsIT.java index 5c682b07f3..8fe795751c 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/BatchFlowBetweenGroupsIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/BatchFlowBetweenGroupsIT.java @@ -108,7 +108,7 @@ public class BatchFlowBetweenGroupsIT extends NiFiSystemIT { getClientUtil().startProcessGroupComponents(processGroupA.getId()); // Start generate processor and wait for data to queue up. Then stop. - getNifiClient().getProcessorClient().startProcessor(generate); + getClientUtil().startProcessor(generate); waitForQueueNotEmpty(generateToInputPortA.getId()); getNifiClient().getProcessorClient().stopProcessor(generate); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java index 99a28ec5a0..7c96e82edf 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java @@ -127,7 +127,7 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { // Start generate so that data is created. Start Input Port so that the data is ingested. // Start Output Ports but not the Sleep processor. This will keep data queued up for the Sleep processor, // and that should prevent data from being transferred by Output Port "Out2" also. - getNifiClient().getProcessorClient().startProcessor(generate); + getClientUtil().startProcessor(generate); getNifiClient().getInputPortClient().startInputPort(inputPort); getNifiClient().getOutputPortClient().startOutputPort(outputPort); getNifiClient().getOutputPortClient().startOutputPort(secondOut); @@ -143,7 +143,7 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { } // Start Sleep - getNifiClient().getProcessorClient().startProcessor(sleep); + getClientUtil().startProcessor(sleep); // Data should now flow from both output ports. waitForQueueCount(inputToSleep.getId(), 0); @@ -192,7 +192,7 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { // Start generate so that data is created. Start Input Port so that the data is ingested. // Start "Out" Output Ports but "Out2.". This will keep data queued up for the Out2 output port. - getNifiClient().getProcessorClient().startProcessor(generate); + getClientUtil().startProcessor(generate); getNifiClient().getInputPortClient().startInputPort(inputPort); getNifiClient().getOutputPortClient().startOutputPort(outputPort); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ProvenanceRepositoryIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ProvenanceRepositoryIT.java index 5665a46bf8..a167dfa261 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ProvenanceRepositoryIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ProvenanceRepositoryIT.java @@ -65,8 +65,8 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT { getClientUtil().setAutoTerminatedRelationships(count, "success"); getClientUtil().createConnection(generateFlowFile, count, "success"); - getNifiClient().getProcessorClient().startProcessor(generateFlowFile); - getNifiClient().getProcessorClient().startProcessor(count); + getClientUtil().startProcessor(generateFlowFile); + getClientUtil().startProcessor(count); ProvenanceSearchValueDTO searchValueDto = new ProvenanceSearchValueDTO(); searchValueDto.setValue(generateFlowFile.getId()); @@ -102,7 +102,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT { getClientUtil().setAutoTerminatedRelationships(terminate, "success"); getClientUtil().createConnection(generateFlowFile, terminate, "success"); - generateFlowFile = getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + getClientUtil().startProcessor(generateFlowFile); ProvenanceSearchValueDTO searchValueDto = new ProvenanceSearchValueDTO(); searchValueDto.setValue(generateFlowFile.getId()); @@ -118,7 +118,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT { // The GenerateFlowFile would have 800. The first 200 events from Terminate will be in the first Event File, causing that one to // roll over and subsequently be aged off. The second Event File will hold the other 600. So we may have 600 or 800 events, // depending on when the query is executed. - getNifiClient().getProcessorClient().startProcessor(terminate); + getClientUtil().startProcessor(terminate); ProvenanceSearchValueDTO terminateSearchValueDto = new ProvenanceSearchValueDTO(); terminateSearchValueDto.setValue(terminate.getId()); @@ -131,7 +131,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT { // Emit 25 more events getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "25")); - getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + getClientUtil().startProcessor(generateFlowFile); // Wait for those 25 events to be emitted waitForEventCountAtLeast(generateSearchTerms, 25); @@ -152,7 +152,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT { getClientUtil().setAutoTerminatedRelationships(terminate, "success"); getClientUtil().createConnection(generateFlowFile, terminate, "success"); - generateFlowFile = getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + getClientUtil().startProcessor(generateFlowFile); ProvenanceSearchValueDTO searchValueDto = new ProvenanceSearchValueDTO(); searchValueDto.setValue(generateFlowFile.getId()); @@ -168,7 +168,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT { // The GenerateFlowFile would have 800. The first 200 events from Terminate will be in the first Event File, causing that one to // roll over and subsequently be aged off. The second Event File will hold the other 600. So we may have 600 or 800 events, // depending on when the query is executed. - getNifiClient().getProcessorClient().startProcessor(terminate); + getClientUtil().startProcessor(terminate); ProvenanceSearchValueDTO terminateSearchValueDto = new ProvenanceSearchValueDTO(); terminateSearchValueDto.setValue(terminate.getId()); @@ -193,7 +193,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT { // Emit 400 more events generateFlowFile.getRevision().setVersion(0L); // Reset the revision getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "400")); - getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + getClientUtil().startProcessor(generateFlowFile); // Since we restarted, the previous Event File will be rolled over. And since it will be > 1 KB in size, it will age off almost immediately. // This will leave us with only the 400 newly created events. diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java index bca8e1c628..b6ee614eb8 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java @@ -97,11 +97,11 @@ public class ContentAccessIT extends NiFiSystemIT { final ConnectionEntity verifyToTerminateUnmatched = getClientUtil().createConnection(verify, terminateAa, "unmatched"); // Run Generate processor, wait for its output - getNifiClient().getProcessorClient().startProcessor(generate); + getClientUtil().startProcessor(generate); waitForQueueCount(generateToSplit.getId(), 1); // Run split processor, wait for its output - getNifiClient().getProcessorClient().startProcessor(split); + getClientUtil().startProcessor(split); waitForQueueCount(splitToReverse.getId(), 3); // Verify output of the Split processor @@ -122,7 +122,7 @@ public class ContentAccessIT extends NiFiSystemIT { assertTrue(splitContents.contains("{ a : c }")); // Start the reverse processor, wait for its output - getNifiClient().getProcessorClient().startProcessor(reverse); + getClientUtil().startProcessor(reverse); waitForQueueCount(reverseToVerify.getId(), 3); final String firstReversedContents = getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 0); @@ -140,7 +140,7 @@ public class ContentAccessIT extends NiFiSystemIT { // Start verify processor. This is different than verify the contents above because doing so above is handled by making a REST call, which does not make use // of the ProcessSession. Using the VerifyContents processor ensures that the Processors see the same contents. - getNifiClient().getProcessorClient().startProcessor(verify); + getClientUtil().startProcessor(verify); waitForQueueCount(verifyToTerminateAa.getId(), 1); waitForQueueCount(verifyToTerminateBa.getId(), 1); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/FlowFileRestorationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/FlowFileRestorationIT.java index 747103cdb7..4a19a07c05 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/FlowFileRestorationIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/FlowFileRestorationIT.java @@ -50,7 +50,7 @@ public class FlowFileRestorationIT extends NiFiSystemIT { final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); final ConnectionEntity connection = getClientUtil().createConnection(generator, terminate, "success"); - getNifiClient().getProcessorClient().startProcessor(generator); + getClientUtil().startProcessor(generator); waitForQueueCount(connection.getId(), 1); getNifiClient().getProcessorClient().stopProcessor(generator); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/rpg/RemoteProcessGroupIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/rpg/RemoteProcessGroupIT.java index d58e15aea8..045a2fe3a0 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/rpg/RemoteProcessGroupIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/rpg/RemoteProcessGroupIT.java @@ -102,7 +102,7 @@ public class RemoteProcessGroupIT extends NiFiSystemIT { getNifiClient().getInputPortClient().startInputPort(port); getClientUtil().waitForValidProcessor(generateFlowFile.getId()); - getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + getClientUtil().startProcessor(generateFlowFile); getNifiClient().getRemoteProcessGroupClient().startTransmitting(rpg); waitFor(() -> util.getQueueSize(generateToRPG.getId()).getObjectCount() == 0); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java index 3217a3ae2e..1fb428ca30 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java @@ -50,7 +50,7 @@ public class ProcessGroupVariablesIT extends NiFiSystemIT { getClientUtil().waitForValidProcessor(generateFlowFile.getId()); // Start Processor, wait for 1 FlowFile to be queued up, then stop processor - getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + getClientUtil().startProcessor(generateFlowFile); waitForQueueCount(connection.getId(), 1); final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0); @@ -89,7 +89,7 @@ public class ProcessGroupVariablesIT extends NiFiSystemIT { getClientUtil().waitForValidProcessor(generateFlowFile.getId()); // Start Processor, wait for 1 FlowFile to be queued up, then stop processor - getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + getClientUtil().startProcessor(generateFlowFile); waitForQueueCount(connection.getId(), 1); final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0); @@ -129,7 +129,7 @@ public class ProcessGroupVariablesIT extends NiFiSystemIT { getClientUtil().waitForValidProcessor(generateFlowFile.getId()); // Start Processor, wait for 1 FlowFile to be queued up, then stop processor - getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + getClientUtil().startProcessor(generateFlowFile); waitForQueueCount(connection.getId(), 1); final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0);