mirror of
https://github.com/apache/nifi.git
synced 2025-02-10 03:55:22 +00:00
NIFI-10290: Updated system tests to ensure that they do not attempt to start processors while the processors are still validating.
This closes #6253 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
c77f85aafb
commit
d5386acb4f
@ -101,6 +101,7 @@ import java.util.Date;
|
|||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@ -129,9 +130,11 @@ public class NiFiClientUtil {
|
|||||||
return client;
|
return client;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void startProcessor(final ProcessorEntity currentEntity) throws NiFiClientException, IOException {
|
public ProcessorEntity startProcessor(final ProcessorEntity currentEntity) throws NiFiClientException, IOException, InterruptedException {
|
||||||
|
waitForValidationCompleted(currentEntity);
|
||||||
|
|
||||||
currentEntity.setDisconnectedNodeAcknowledged(true);
|
currentEntity.setDisconnectedNodeAcknowledged(true);
|
||||||
getProcessorClient().startProcessor(currentEntity);
|
return getProcessorClient().startProcessor(currentEntity);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stopProcessor(final ProcessorEntity currentEntity) throws NiFiClientException, IOException, InterruptedException {
|
public void stopProcessor(final ProcessorEntity currentEntity) throws NiFiClientException, IOException, InterruptedException {
|
||||||
@ -475,6 +478,19 @@ public class NiFiClientUtil {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void waitForValidationCompleted(final ProcessorEntity processorEntity) throws NiFiClientException, IOException, InterruptedException {
|
||||||
|
String validationStatus;
|
||||||
|
do {
|
||||||
|
final ProcessorEntity currentEntity = getProcessorClient().getProcessor(processorEntity.getId());
|
||||||
|
validationStatus = currentEntity.getComponent().getValidationStatus();
|
||||||
|
|
||||||
|
if (validationStatus.equals(ProcessorDTO.VALIDATING)) {
|
||||||
|
logger.debug("Waiting for Processor {} to finish validating...", processorEntity.getId());
|
||||||
|
Thread.sleep(100L);
|
||||||
|
}
|
||||||
|
} while (Objects.equals(validationStatus, ProcessorDTO.VALIDATING));
|
||||||
|
}
|
||||||
|
|
||||||
public void waitForRunningProcessor(final String processorId) throws InterruptedException, IOException, NiFiClientException {
|
public void waitForRunningProcessor(final String processorId) throws InterruptedException, IOException, NiFiClientException {
|
||||||
waitForProcessorState(processorId, "RUNNING");
|
waitForProcessorState(processorId, "RUNNING");
|
||||||
}
|
}
|
||||||
@ -606,7 +622,9 @@ public class NiFiClientUtil {
|
|||||||
return counterValues;
|
return counterValues;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ScheduleComponentsEntity startProcessGroupComponents(final String groupId) throws NiFiClientException, IOException {
|
public ScheduleComponentsEntity startProcessGroupComponents(final String groupId) throws NiFiClientException, IOException, InterruptedException {
|
||||||
|
waitForAllProcessorValidationToComplete(groupId);
|
||||||
|
|
||||||
final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity();
|
final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity();
|
||||||
scheduleComponentsEntity.setId(groupId);
|
scheduleComponentsEntity.setId(groupId);
|
||||||
scheduleComponentsEntity.setState("RUNNING");
|
scheduleComponentsEntity.setState("RUNNING");
|
||||||
@ -616,6 +634,30 @@ public class NiFiClientUtil {
|
|||||||
return scheduleEntity;
|
return scheduleEntity;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void waitForAllProcessorValidationToComplete(final String groupId) throws NiFiClientException, IOException, InterruptedException {
|
||||||
|
final Set<ProcessorEntity> processors = findAllProcessors(groupId);
|
||||||
|
|
||||||
|
for (final ProcessorEntity processor : processors) {
|
||||||
|
waitForValidationCompleted(processor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<ProcessorEntity> findAllProcessors(final String groupId) throws NiFiClientException, IOException {
|
||||||
|
final Set<ProcessorEntity> processors = new HashSet<>();
|
||||||
|
findAllProcessors(groupId, processors);
|
||||||
|
return processors;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void findAllProcessors(final String groupId, final Set<ProcessorEntity> allProcessors) throws NiFiClientException, IOException {
|
||||||
|
final ProcessGroupFlowEntity flowEntity = nifiClient.getFlowClient().getProcessGroup(groupId);
|
||||||
|
final FlowDTO flowDto = flowEntity.getProcessGroupFlow().getFlow();
|
||||||
|
allProcessors.addAll(flowDto.getProcessors());
|
||||||
|
|
||||||
|
for (final ProcessGroupEntity childGroup : flowDto.getProcessGroups()) {
|
||||||
|
findAllProcessors(childGroup.getId(), allProcessors);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public ScheduleComponentsEntity stopProcessGroupComponents(final String groupId) throws NiFiClientException, IOException {
|
public ScheduleComponentsEntity stopProcessGroupComponents(final String groupId) throws NiFiClientException, IOException {
|
||||||
final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity();
|
final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity();
|
||||||
scheduleComponentsEntity.setId(groupId);
|
scheduleComponentsEntity.setId(groupId);
|
||||||
@ -722,13 +764,13 @@ public class NiFiClientUtil {
|
|||||||
public void waitForControllerServiceRunStatus(final String id, final String requestedRunStatus) throws NiFiClientException, IOException {
|
public void waitForControllerServiceRunStatus(final String id, final String requestedRunStatus) throws NiFiClientException, IOException {
|
||||||
while (true) {
|
while (true) {
|
||||||
final ControllerServiceEntity serviceEntity = nifiClient.getControllerServicesClient().getControllerService(id);
|
final ControllerServiceEntity serviceEntity = nifiClient.getControllerServicesClient().getControllerService(id);
|
||||||
final String runStatus = serviceEntity.getStatus().getRunStatus();
|
final String serviceState = serviceEntity.getComponent().getState();
|
||||||
if (requestedRunStatus.equals(runStatus)) {
|
if (requestedRunStatus.equals(serviceState)) {
|
||||||
logger.info("Controller Service [{}] run status [{}] found", id, runStatus);
|
logger.info("Controller Service [{}] run status [{}] found", id, serviceState);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info("Controller Service [{}] run status [{}] not matched [{}]: sleeping before retrying", id, runStatus, requestedRunStatus);
|
logger.info("Controller Service [{}] run status [{}] not matched [{}]: sleeping before retrying", id, serviceState, requestedRunStatus);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(500L);
|
Thread.sleep(500L);
|
||||||
@ -773,7 +815,7 @@ public class NiFiClientUtil {
|
|||||||
public void waitForControllerServiceValidationStatus(final String controllerServiceId, final String validationStatus) throws NiFiClientException, IOException {
|
public void waitForControllerServiceValidationStatus(final String controllerServiceId, final String validationStatus) throws NiFiClientException, IOException {
|
||||||
while (true) {
|
while (true) {
|
||||||
final ControllerServiceEntity controllerServiceEntity = nifiClient.getControllerServicesClient().getControllerService(controllerServiceId);
|
final ControllerServiceEntity controllerServiceEntity = nifiClient.getControllerServicesClient().getControllerService(controllerServiceId);
|
||||||
final String currentValidationStatus = controllerServiceEntity.getStatus().getValidationStatus();
|
final String currentValidationStatus = controllerServiceEntity.getComponent().getValidationStatus();
|
||||||
if (validationStatus.equals(currentValidationStatus)) {
|
if (validationStatus.equals(currentValidationStatus)) {
|
||||||
logger.info("Controller Service ID [{}] Type [{}] Validation Status [{}] matched", controllerServiceId,
|
logger.info("Controller Service ID [{}] Type [{}] Validation Status [{}] matched", controllerServiceId,
|
||||||
controllerServiceEntity.getComponent().getType(), validationStatus);
|
controllerServiceEntity.getComponent().getType(), validationStatus);
|
||||||
|
@ -733,7 +733,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
|
|||||||
getClientUtil().enableControllerService(countB);
|
getClientUtil().enableControllerService(countB);
|
||||||
getClientUtil().enableControllerService(countA);
|
getClientUtil().enableControllerService(countA);
|
||||||
|
|
||||||
getNifiClient().getProcessorClient().startProcessor(countFlowFiles);
|
getClientUtil().startProcessor(countFlowFiles);
|
||||||
|
|
||||||
// Disconnect Node 2. Switch client to direct requests to Node 2 so that we can update the node while it's disconnected.
|
// Disconnect Node 2. Switch client to direct requests to Node 2 so that we can update the node while it's disconnected.
|
||||||
disconnectNode(2);
|
disconnectNode(2);
|
||||||
|
@ -70,7 +70,7 @@ public class JoinClusterAdjustStateIT extends NiFiSystemIT {
|
|||||||
|
|
||||||
// Start the Processor that requires a file named "monitored" to exist. When we join Node 2 to the cluster, this directory will not exist.
|
// Start the Processor that requires a file named "monitored" to exist. When we join Node 2 to the cluster, this directory will not exist.
|
||||||
// We want to ensure that the Processor does in fact start on its own when the directory is created.
|
// We want to ensure that the Processor does in fact start on its own when the directory is created.
|
||||||
getNifiClient().getProcessorClient().startProcessor(fileProcessor.getId(), fileProcessor.getRevision().getClientId(), 1);
|
getClientUtil().startProcessor(fileProcessor);
|
||||||
getClientUtil().waitForRunningProcessor(fileProcessor.getId());
|
getClientUtil().waitForRunningProcessor(fileProcessor.getId());
|
||||||
|
|
||||||
// Create a new NiFi instance
|
// Create a new NiFi instance
|
||||||
@ -84,7 +84,7 @@ public class JoinClusterAdjustStateIT extends NiFiSystemIT {
|
|||||||
|
|
||||||
// Start the Count Processor on Node 1. When Node 2 joins the cluster, we know that its flow will indicate that the Processor is stopped.
|
// Start the Count Processor on Node 1. When Node 2 joins the cluster, we know that its flow will indicate that the Processor is stopped.
|
||||||
// But because the cluster indicates that the Processor is running, the second node should inherit this value and immediately start the Processor also.
|
// But because the cluster indicates that the Processor is running, the second node should inherit this value and immediately start the Processor also.
|
||||||
getNifiClient().getProcessorClient().startProcessor(countProcessor.getId(), countProcessor.getRevision().getClientId(), 1);
|
getClientUtil().startProcessor(countProcessor);
|
||||||
|
|
||||||
node2Instance.start();
|
node2Instance.start();
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ public class JoinClusterWithMissingConnectionWithData extends NiFiSystemIT {
|
|||||||
ProcessorEntity generate = getNifiClient().getProcessorClient().getProcessor(GENERATE_UUID);
|
ProcessorEntity generate = getNifiClient().getProcessorClient().getProcessor(GENERATE_UUID);
|
||||||
|
|
||||||
// Start Generate Processor
|
// Start Generate Processor
|
||||||
generate = getNifiClient().getProcessorClient().startProcessor(generate);
|
generate = getClientUtil().startProcessor(generate);
|
||||||
|
|
||||||
// Wait for data to be queued up, one FlowFile for each node.
|
// Wait for data to be queued up, one FlowFile for each node.
|
||||||
waitFor(this::isDataQueued);
|
waitFor(this::isDataQueued);
|
||||||
|
@ -93,7 +93,7 @@ public class LoadBalanceIT extends NiFiSystemIT {
|
|||||||
getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.ROUND_ROBIN, compression, null);
|
getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.ROUND_ROBIN, compression, null);
|
||||||
|
|
||||||
// Generate the data.
|
// Generate the data.
|
||||||
getNifiClient().getProcessorClient().startProcessor(generate);
|
getClientUtil().startProcessor(generate);
|
||||||
|
|
||||||
// Wait until all 20 FlowFiles are queued up.
|
// Wait until all 20 FlowFiles are queued up.
|
||||||
waitFor(() -> {
|
waitFor(() -> {
|
||||||
@ -131,7 +131,7 @@ public class LoadBalanceIT extends NiFiSystemIT {
|
|||||||
getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.SINGLE_NODE, LoadBalanceCompression.DO_NOT_COMPRESS, null);
|
getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.SINGLE_NODE, LoadBalanceCompression.DO_NOT_COMPRESS, null);
|
||||||
|
|
||||||
// Generate the data.
|
// Generate the data.
|
||||||
getNifiClient().getProcessorClient().startProcessor(generate);
|
getClientUtil().startProcessor(generate);
|
||||||
|
|
||||||
// Wait until all 20 FlowFiles are queued up.
|
// Wait until all 20 FlowFiles are queued up.
|
||||||
waitFor(() -> {
|
waitFor(() -> {
|
||||||
@ -187,7 +187,7 @@ public class LoadBalanceIT extends NiFiSystemIT {
|
|||||||
// Queue 100 FlowFiles. 10 with number=0, 10 with number=1, 10 with number=2, etc. to up 10 with number=9
|
// Queue 100 FlowFiles. 10 with number=0, 10 with number=1, 10 with number=2, etc. to up 10 with number=9
|
||||||
for (int i=1; i <= 10; i++) {
|
for (int i=1; i <= 10; i++) {
|
||||||
// Generate the data.
|
// Generate the data.
|
||||||
getNifiClient().getProcessorClient().startProcessor(generate);
|
getClientUtil().startProcessor(generate);
|
||||||
|
|
||||||
final int expectedQueueSize = 10 * i;
|
final int expectedQueueSize = 10 * i;
|
||||||
|
|
||||||
@ -238,7 +238,7 @@ public class LoadBalanceIT extends NiFiSystemIT {
|
|||||||
getClientUtil().updateProcessorProperties(generate, generateProperties);
|
getClientUtil().updateProcessorProperties(generate, generateProperties);
|
||||||
|
|
||||||
// Generate the data.
|
// Generate the data.
|
||||||
getNifiClient().getProcessorClient().startProcessor(generate);
|
getClientUtil().startProcessor(generate);
|
||||||
|
|
||||||
// Wait until all 20 FlowFiles are queued up.
|
// Wait until all 20 FlowFiles are queued up.
|
||||||
waitFor(() -> {
|
waitFor(() -> {
|
||||||
@ -338,7 +338,7 @@ public class LoadBalanceIT extends NiFiSystemIT {
|
|||||||
getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.ROUND_ROBIN, LoadBalanceCompression.DO_NOT_COMPRESS, null);
|
getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.ROUND_ROBIN, LoadBalanceCompression.DO_NOT_COMPRESS, null);
|
||||||
|
|
||||||
// Generate the data.
|
// Generate the data.
|
||||||
getNifiClient().getProcessorClient().startProcessor(generate);
|
getClientUtil().startProcessor(generate);
|
||||||
|
|
||||||
// Wait until all 20 FlowFiles are queued up.
|
// Wait until all 20 FlowFiles are queued up.
|
||||||
waitFor(() -> {
|
waitFor(() -> {
|
||||||
@ -380,7 +380,7 @@ public class LoadBalanceIT extends NiFiSystemIT {
|
|||||||
// times out.
|
// times out.
|
||||||
while (true) {
|
while (true) {
|
||||||
// Generate the data.
|
// Generate the data.
|
||||||
getNifiClient().getProcessorClient().startProcessor(generate);
|
getClientUtil().startProcessor(generate);
|
||||||
|
|
||||||
// Wait until all 20 FlowFiles are queued up
|
// Wait until all 20 FlowFiles are queued up
|
||||||
waitFor(() -> {
|
waitFor(() -> {
|
||||||
|
@ -372,7 +372,7 @@ public class ParameterContextIT extends NiFiSystemIT {
|
|||||||
waitForValidProcessor(processorId);
|
waitForValidProcessor(processorId);
|
||||||
|
|
||||||
// Start Processors
|
// Start Processors
|
||||||
getNifiClient().getProcessorClient().startProcessor(processorId, processorEntity.getRevision().getClientId(), 1);
|
getClientUtil().startProcessor(processorEntity);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Update Parameter Context to a long validation time.
|
// Update Parameter Context to a long validation time.
|
||||||
@ -388,9 +388,7 @@ public class ParameterContextIT extends NiFiSystemIT {
|
|||||||
waitForRunningProcessor(processorId);
|
waitForRunningProcessor(processorId);
|
||||||
} finally {
|
} finally {
|
||||||
// Ensure that we stop the processor so that other tests are allowed to change the Parameter Context, etc.
|
// Ensure that we stop the processor so that other tests are allowed to change the Parameter Context, etc.
|
||||||
getNifiClient().getProcessorClient().stopProcessor(processorId, processorEntity.getRevision().getClientId(), 2);
|
getClientUtil().stopProcessor(processorEntity);
|
||||||
waitForStoppedProcessor(processorId);
|
|
||||||
|
|
||||||
getNifiClient().getProcessorClient().deleteProcessor(processorId, processorEntity.getRevision().getClientId(), 3);
|
getNifiClient().getProcessorClient().deleteProcessor(processorId, processorEntity.getRevision().getClientId(), 3);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -419,7 +417,7 @@ public class ParameterContextIT extends NiFiSystemIT {
|
|||||||
processorEntity.getComponent().getConfig().setAutoTerminatedRelationships(Collections.singleton("success"));
|
processorEntity.getComponent().getConfig().setAutoTerminatedRelationships(Collections.singleton("success"));
|
||||||
|
|
||||||
getNifiClient().getProcessorClient().updateProcessor(processorEntity);
|
getNifiClient().getProcessorClient().updateProcessor(processorEntity);
|
||||||
getNifiClient().getProcessorClient().startProcessor(processorId, processorEntity.getRevision().getClientId(), 1L);
|
getClientUtil().startProcessor(processorEntity);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final ParameterContextUpdateRequestEntity requestEntity = updateParameterContext(createdContextEntity, "sleep", "6 secs");
|
final ParameterContextUpdateRequestEntity requestEntity = updateParameterContext(createdContextEntity, "sleep", "6 secs");
|
||||||
@ -435,8 +433,7 @@ public class ParameterContextIT extends NiFiSystemIT {
|
|||||||
|
|
||||||
waitForRunningProcessor(processorId);
|
waitForRunningProcessor(processorId);
|
||||||
} finally {
|
} finally {
|
||||||
getNifiClient().getProcessorClient().stopProcessor(processorId, processorEntity.getRevision().getClientId(), 1L);
|
getClientUtil().stopProcessor(processorEntity);
|
||||||
waitForStoppedProcessor(processorId);
|
|
||||||
getNifiClient().getProcessorClient().deleteProcessor(processorId, processorEntity.getRevision().getClientId(), 3);
|
getNifiClient().getProcessorClient().deleteProcessor(processorId, processorEntity.getRevision().getClientId(), 3);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
@ -532,7 +529,7 @@ public class ParameterContextIT extends NiFiSystemIT {
|
|||||||
getClientUtil().setAutoTerminatedRelationships(splitByLine, Collections.singleton("success"));
|
getClientUtil().setAutoTerminatedRelationships(splitByLine, Collections.singleton("success"));
|
||||||
getClientUtil().createConnection(generate, splitByLine, "success");
|
getClientUtil().createConnection(generate, splitByLine, "success");
|
||||||
|
|
||||||
getNifiClient().getProcessorClient().startProcessor(splitByLine);
|
getClientUtil().startProcessor(splitByLine);
|
||||||
|
|
||||||
// Change parameter to an invalid value. This will result in the processor being stopped, becoming invalid, and then being transitioned to a 'starting' state while invalid.
|
// Change parameter to an invalid value. This will result in the processor being stopped, becoming invalid, and then being transitioned to a 'starting' state while invalid.
|
||||||
final ParameterContextUpdateRequestEntity updateToInvalidRequestEntity = updateParameterContext(contextEntity, "clone", "invalid");
|
final ParameterContextUpdateRequestEntity updateToInvalidRequestEntity = updateParameterContext(contextEntity, "clone", "invalid");
|
||||||
@ -575,8 +572,8 @@ public class ParameterContextIT extends NiFiSystemIT {
|
|||||||
final ProcessorEntity secondProcessorEntity = createProcessor(TEST_PROCESSORS_PACKAGE + ".CountEvents", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
|
final ProcessorEntity secondProcessorEntity = createProcessor(TEST_PROCESSORS_PACKAGE + ".CountEvents", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
|
||||||
|
|
||||||
// Start Processors
|
// Start Processors
|
||||||
getNifiClient().getProcessorClient().startProcessor(processorEntity.getId(), processorEntity.getRevision().getClientId(), 1L);
|
getClientUtil().startProcessor(processorEntity);
|
||||||
getNifiClient().getProcessorClient().startProcessor(secondProcessorEntity.getId(), secondProcessorEntity.getRevision().getClientId(), 1L);
|
getClientUtil().startProcessor(secondProcessorEntity);
|
||||||
|
|
||||||
Map<String, Long> counterValues = waitForCounter(processorEntity.getId(), "Scheduled", getNumberOfNodes());
|
Map<String, Long> counterValues = waitForCounter(processorEntity.getId(), "Scheduled", getNumberOfNodes());
|
||||||
assertFalse(counterValues.containsKey("Stopped"));
|
assertFalse(counterValues.containsKey("Stopped"));
|
||||||
|
@ -108,7 +108,7 @@ public class BatchFlowBetweenGroupsIT extends NiFiSystemIT {
|
|||||||
getClientUtil().startProcessGroupComponents(processGroupA.getId());
|
getClientUtil().startProcessGroupComponents(processGroupA.getId());
|
||||||
|
|
||||||
// Start generate processor and wait for data to queue up. Then stop.
|
// Start generate processor and wait for data to queue up. Then stop.
|
||||||
getNifiClient().getProcessorClient().startProcessor(generate);
|
getClientUtil().startProcessor(generate);
|
||||||
waitForQueueNotEmpty(generateToInputPortA.getId());
|
waitForQueueNotEmpty(generateToInputPortA.getId());
|
||||||
getNifiClient().getProcessorClient().stopProcessor(generate);
|
getNifiClient().getProcessorClient().stopProcessor(generate);
|
||||||
|
|
||||||
|
@ -127,7 +127,7 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
|
|||||||
// Start generate so that data is created. Start Input Port so that the data is ingested.
|
// Start generate so that data is created. Start Input Port so that the data is ingested.
|
||||||
// Start Output Ports but not the Sleep processor. This will keep data queued up for the Sleep processor,
|
// Start Output Ports but not the Sleep processor. This will keep data queued up for the Sleep processor,
|
||||||
// and that should prevent data from being transferred by Output Port "Out2" also.
|
// and that should prevent data from being transferred by Output Port "Out2" also.
|
||||||
getNifiClient().getProcessorClient().startProcessor(generate);
|
getClientUtil().startProcessor(generate);
|
||||||
getNifiClient().getInputPortClient().startInputPort(inputPort);
|
getNifiClient().getInputPortClient().startInputPort(inputPort);
|
||||||
getNifiClient().getOutputPortClient().startOutputPort(outputPort);
|
getNifiClient().getOutputPortClient().startOutputPort(outputPort);
|
||||||
getNifiClient().getOutputPortClient().startOutputPort(secondOut);
|
getNifiClient().getOutputPortClient().startOutputPort(secondOut);
|
||||||
@ -143,7 +143,7 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start Sleep
|
// Start Sleep
|
||||||
getNifiClient().getProcessorClient().startProcessor(sleep);
|
getClientUtil().startProcessor(sleep);
|
||||||
|
|
||||||
// Data should now flow from both output ports.
|
// Data should now flow from both output ports.
|
||||||
waitForQueueCount(inputToSleep.getId(), 0);
|
waitForQueueCount(inputToSleep.getId(), 0);
|
||||||
@ -192,7 +192,7 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
|
|||||||
|
|
||||||
// Start generate so that data is created. Start Input Port so that the data is ingested.
|
// Start generate so that data is created. Start Input Port so that the data is ingested.
|
||||||
// Start "Out" Output Ports but "Out2.". This will keep data queued up for the Out2 output port.
|
// Start "Out" Output Ports but "Out2.". This will keep data queued up for the Out2 output port.
|
||||||
getNifiClient().getProcessorClient().startProcessor(generate);
|
getClientUtil().startProcessor(generate);
|
||||||
getNifiClient().getInputPortClient().startInputPort(inputPort);
|
getNifiClient().getInputPortClient().startInputPort(inputPort);
|
||||||
getNifiClient().getOutputPortClient().startOutputPort(outputPort);
|
getNifiClient().getOutputPortClient().startOutputPort(outputPort);
|
||||||
|
|
||||||
|
@ -65,8 +65,8 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
|
|||||||
getClientUtil().setAutoTerminatedRelationships(count, "success");
|
getClientUtil().setAutoTerminatedRelationships(count, "success");
|
||||||
getClientUtil().createConnection(generateFlowFile, count, "success");
|
getClientUtil().createConnection(generateFlowFile, count, "success");
|
||||||
|
|
||||||
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
|
getClientUtil().startProcessor(generateFlowFile);
|
||||||
getNifiClient().getProcessorClient().startProcessor(count);
|
getClientUtil().startProcessor(count);
|
||||||
|
|
||||||
ProvenanceSearchValueDTO searchValueDto = new ProvenanceSearchValueDTO();
|
ProvenanceSearchValueDTO searchValueDto = new ProvenanceSearchValueDTO();
|
||||||
searchValueDto.setValue(generateFlowFile.getId());
|
searchValueDto.setValue(generateFlowFile.getId());
|
||||||
@ -102,7 +102,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
|
|||||||
getClientUtil().setAutoTerminatedRelationships(terminate, "success");
|
getClientUtil().setAutoTerminatedRelationships(terminate, "success");
|
||||||
getClientUtil().createConnection(generateFlowFile, terminate, "success");
|
getClientUtil().createConnection(generateFlowFile, terminate, "success");
|
||||||
|
|
||||||
generateFlowFile = getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
|
getClientUtil().startProcessor(generateFlowFile);
|
||||||
|
|
||||||
ProvenanceSearchValueDTO searchValueDto = new ProvenanceSearchValueDTO();
|
ProvenanceSearchValueDTO searchValueDto = new ProvenanceSearchValueDTO();
|
||||||
searchValueDto.setValue(generateFlowFile.getId());
|
searchValueDto.setValue(generateFlowFile.getId());
|
||||||
@ -118,7 +118,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
|
|||||||
// The GenerateFlowFile would have 800. The first 200 events from Terminate will be in the first Event File, causing that one to
|
// The GenerateFlowFile would have 800. The first 200 events from Terminate will be in the first Event File, causing that one to
|
||||||
// roll over and subsequently be aged off. The second Event File will hold the other 600. So we may have 600 or 800 events,
|
// roll over and subsequently be aged off. The second Event File will hold the other 600. So we may have 600 or 800 events,
|
||||||
// depending on when the query is executed.
|
// depending on when the query is executed.
|
||||||
getNifiClient().getProcessorClient().startProcessor(terminate);
|
getClientUtil().startProcessor(terminate);
|
||||||
|
|
||||||
ProvenanceSearchValueDTO terminateSearchValueDto = new ProvenanceSearchValueDTO();
|
ProvenanceSearchValueDTO terminateSearchValueDto = new ProvenanceSearchValueDTO();
|
||||||
terminateSearchValueDto.setValue(terminate.getId());
|
terminateSearchValueDto.setValue(terminate.getId());
|
||||||
@ -131,7 +131,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
|
|||||||
|
|
||||||
// Emit 25 more events
|
// Emit 25 more events
|
||||||
getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "25"));
|
getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "25"));
|
||||||
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
|
getClientUtil().startProcessor(generateFlowFile);
|
||||||
|
|
||||||
// Wait for those 25 events to be emitted
|
// Wait for those 25 events to be emitted
|
||||||
waitForEventCountAtLeast(generateSearchTerms, 25);
|
waitForEventCountAtLeast(generateSearchTerms, 25);
|
||||||
@ -152,7 +152,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
|
|||||||
getClientUtil().setAutoTerminatedRelationships(terminate, "success");
|
getClientUtil().setAutoTerminatedRelationships(terminate, "success");
|
||||||
getClientUtil().createConnection(generateFlowFile, terminate, "success");
|
getClientUtil().createConnection(generateFlowFile, terminate, "success");
|
||||||
|
|
||||||
generateFlowFile = getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
|
getClientUtil().startProcessor(generateFlowFile);
|
||||||
|
|
||||||
ProvenanceSearchValueDTO searchValueDto = new ProvenanceSearchValueDTO();
|
ProvenanceSearchValueDTO searchValueDto = new ProvenanceSearchValueDTO();
|
||||||
searchValueDto.setValue(generateFlowFile.getId());
|
searchValueDto.setValue(generateFlowFile.getId());
|
||||||
@ -168,7 +168,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
|
|||||||
// The GenerateFlowFile would have 800. The first 200 events from Terminate will be in the first Event File, causing that one to
|
// The GenerateFlowFile would have 800. The first 200 events from Terminate will be in the first Event File, causing that one to
|
||||||
// roll over and subsequently be aged off. The second Event File will hold the other 600. So we may have 600 or 800 events,
|
// roll over and subsequently be aged off. The second Event File will hold the other 600. So we may have 600 or 800 events,
|
||||||
// depending on when the query is executed.
|
// depending on when the query is executed.
|
||||||
getNifiClient().getProcessorClient().startProcessor(terminate);
|
getClientUtil().startProcessor(terminate);
|
||||||
|
|
||||||
ProvenanceSearchValueDTO terminateSearchValueDto = new ProvenanceSearchValueDTO();
|
ProvenanceSearchValueDTO terminateSearchValueDto = new ProvenanceSearchValueDTO();
|
||||||
terminateSearchValueDto.setValue(terminate.getId());
|
terminateSearchValueDto.setValue(terminate.getId());
|
||||||
@ -193,7 +193,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
|
|||||||
// Emit 400 more events
|
// Emit 400 more events
|
||||||
generateFlowFile.getRevision().setVersion(0L); // Reset the revision
|
generateFlowFile.getRevision().setVersion(0L); // Reset the revision
|
||||||
getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "400"));
|
getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "400"));
|
||||||
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
|
getClientUtil().startProcessor(generateFlowFile);
|
||||||
|
|
||||||
// Since we restarted, the previous Event File will be rolled over. And since it will be > 1 KB in size, it will age off almost immediately.
|
// Since we restarted, the previous Event File will be rolled over. And since it will be > 1 KB in size, it will age off almost immediately.
|
||||||
// This will leave us with only the 400 newly created events.
|
// This will leave us with only the 400 newly created events.
|
||||||
|
@ -97,11 +97,11 @@ public class ContentAccessIT extends NiFiSystemIT {
|
|||||||
final ConnectionEntity verifyToTerminateUnmatched = getClientUtil().createConnection(verify, terminateAa, "unmatched");
|
final ConnectionEntity verifyToTerminateUnmatched = getClientUtil().createConnection(verify, terminateAa, "unmatched");
|
||||||
|
|
||||||
// Run Generate processor, wait for its output
|
// Run Generate processor, wait for its output
|
||||||
getNifiClient().getProcessorClient().startProcessor(generate);
|
getClientUtil().startProcessor(generate);
|
||||||
waitForQueueCount(generateToSplit.getId(), 1);
|
waitForQueueCount(generateToSplit.getId(), 1);
|
||||||
|
|
||||||
// Run split processor, wait for its output
|
// Run split processor, wait for its output
|
||||||
getNifiClient().getProcessorClient().startProcessor(split);
|
getClientUtil().startProcessor(split);
|
||||||
waitForQueueCount(splitToReverse.getId(), 3);
|
waitForQueueCount(splitToReverse.getId(), 3);
|
||||||
|
|
||||||
// Verify output of the Split processor
|
// Verify output of the Split processor
|
||||||
@ -122,7 +122,7 @@ public class ContentAccessIT extends NiFiSystemIT {
|
|||||||
assertTrue(splitContents.contains("{ a : c }"));
|
assertTrue(splitContents.contains("{ a : c }"));
|
||||||
|
|
||||||
// Start the reverse processor, wait for its output
|
// Start the reverse processor, wait for its output
|
||||||
getNifiClient().getProcessorClient().startProcessor(reverse);
|
getClientUtil().startProcessor(reverse);
|
||||||
waitForQueueCount(reverseToVerify.getId(), 3);
|
waitForQueueCount(reverseToVerify.getId(), 3);
|
||||||
|
|
||||||
final String firstReversedContents = getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 0);
|
final String firstReversedContents = getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 0);
|
||||||
@ -140,7 +140,7 @@ public class ContentAccessIT extends NiFiSystemIT {
|
|||||||
|
|
||||||
// Start verify processor. This is different than verify the contents above because doing so above is handled by making a REST call, which does not make use
|
// Start verify processor. This is different than verify the contents above because doing so above is handled by making a REST call, which does not make use
|
||||||
// of the ProcessSession. Using the VerifyContents processor ensures that the Processors see the same contents.
|
// of the ProcessSession. Using the VerifyContents processor ensures that the Processors see the same contents.
|
||||||
getNifiClient().getProcessorClient().startProcessor(verify);
|
getClientUtil().startProcessor(verify);
|
||||||
|
|
||||||
waitForQueueCount(verifyToTerminateAa.getId(), 1);
|
waitForQueueCount(verifyToTerminateAa.getId(), 1);
|
||||||
waitForQueueCount(verifyToTerminateBa.getId(), 1);
|
waitForQueueCount(verifyToTerminateBa.getId(), 1);
|
||||||
|
@ -50,7 +50,7 @@ public class FlowFileRestorationIT extends NiFiSystemIT {
|
|||||||
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
|
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
|
||||||
final ConnectionEntity connection = getClientUtil().createConnection(generator, terminate, "success");
|
final ConnectionEntity connection = getClientUtil().createConnection(generator, terminate, "success");
|
||||||
|
|
||||||
getNifiClient().getProcessorClient().startProcessor(generator);
|
getClientUtil().startProcessor(generator);
|
||||||
waitForQueueCount(connection.getId(), 1);
|
waitForQueueCount(connection.getId(), 1);
|
||||||
getNifiClient().getProcessorClient().stopProcessor(generator);
|
getNifiClient().getProcessorClient().stopProcessor(generator);
|
||||||
|
|
||||||
|
@ -102,7 +102,7 @@ public class RemoteProcessGroupIT extends NiFiSystemIT {
|
|||||||
|
|
||||||
getNifiClient().getInputPortClient().startInputPort(port);
|
getNifiClient().getInputPortClient().startInputPort(port);
|
||||||
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
|
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
|
||||||
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
|
getClientUtil().startProcessor(generateFlowFile);
|
||||||
getNifiClient().getRemoteProcessGroupClient().startTransmitting(rpg);
|
getNifiClient().getRemoteProcessGroupClient().startTransmitting(rpg);
|
||||||
|
|
||||||
waitFor(() -> util.getQueueSize(generateToRPG.getId()).getObjectCount() == 0);
|
waitFor(() -> util.getQueueSize(generateToRPG.getId()).getObjectCount() == 0);
|
||||||
|
@ -50,7 +50,7 @@ public class ProcessGroupVariablesIT extends NiFiSystemIT {
|
|||||||
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
|
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
|
||||||
|
|
||||||
// Start Processor, wait for 1 FlowFile to be queued up, then stop processor
|
// Start Processor, wait for 1 FlowFile to be queued up, then stop processor
|
||||||
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
|
getClientUtil().startProcessor(generateFlowFile);
|
||||||
waitForQueueCount(connection.getId(), 1);
|
waitForQueueCount(connection.getId(), 1);
|
||||||
|
|
||||||
final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0);
|
final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0);
|
||||||
@ -89,7 +89,7 @@ public class ProcessGroupVariablesIT extends NiFiSystemIT {
|
|||||||
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
|
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
|
||||||
|
|
||||||
// Start Processor, wait for 1 FlowFile to be queued up, then stop processor
|
// Start Processor, wait for 1 FlowFile to be queued up, then stop processor
|
||||||
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
|
getClientUtil().startProcessor(generateFlowFile);
|
||||||
waitForQueueCount(connection.getId(), 1);
|
waitForQueueCount(connection.getId(), 1);
|
||||||
|
|
||||||
final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0);
|
final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0);
|
||||||
@ -129,7 +129,7 @@ public class ProcessGroupVariablesIT extends NiFiSystemIT {
|
|||||||
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
|
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
|
||||||
|
|
||||||
// Start Processor, wait for 1 FlowFile to be queued up, then stop processor
|
// Start Processor, wait for 1 FlowFile to be queued up, then stop processor
|
||||||
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
|
getClientUtil().startProcessor(generateFlowFile);
|
||||||
waitForQueueCount(connection.getId(), 1);
|
waitForQueueCount(connection.getId(), 1);
|
||||||
|
|
||||||
final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0);
|
final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user