NIFI-11591 Corrected intermittent system test failures

When waiting for a Processor to stop completely in system tests, also consider any terminated threads. Consider the processor stopped only when all active threads and all terminated threads have gone to 0. Additionally, when stopping all components in a Process Group, wait for the PG to reflect that there are no running components.  Previously, we checked for processor active thread counts. However, this was problematic because the flow that was fetched was not fetched recursively and as a result, processors running in embedded groups were not always stopped when the waitForProcessorsStopped(String groupId) method returned. Finally, removed the step for gathering surefire-reports in failed system tests because it was not working as expected and was unneeded because the tests' logs were already gathered into the diagnostic dump that is uploaded.

Added fix for BatchFlowBetweenGroupsIT. The test would intermittently fail, based on timing. The test assumed that when Input Port in Group B was scheduled to run, it would trigger before FlowFiles in Group A finished processing. The Sleep processor caused a 10 ms delay for each of 5 FlowFiles which gives the Input Port about 50 ms to trigger. If it took longer to trigger the first time, the test failed. Adjusted the Sleep processor to use 2 seconds. So now instead of 50 milliseconds to start, it has 10 seconds. Overall this adds about 30-45 seconds to the run time of the test but ensures more-than-adequate time for the test to run, even on extremely slow hardware

This closes #7289

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2023-05-24 10:21:34 -04:00 committed by exceptionfactory
parent 40157ae5a2
commit 108f841525
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
3 changed files with 46 additions and 12 deletions

View File

@ -110,9 +110,6 @@ jobs:
nifi-system-tests/nifi-system-test-suite/target/surefire-reports/**/*.txt
nifi-system-tests/nifi-system-test-suite/target/troubleshooting/**/*
retention-days: 7
- name: Upload Surefire Report
if: failure()
uses: scacap/action-surefire-report@v1
ubuntu:
timeout-minutes: 120
@ -157,9 +154,6 @@ jobs:
nifi-system-tests/nifi-system-test-suite/target/surefire-reports/**/*.txt
nifi-system-tests/nifi-system-test-suite/target/troubleshooting/**/*
retention-days: 7
- name: Upload Surefire Report
if: failure()
uses: scacap/action-surefire-report@v1
macos:
timeout-minutes: 120
@ -204,6 +198,3 @@ jobs:
nifi-system-tests/nifi-system-test-suite/target/surefire-reports/**/*.txt
nifi-system-tests/nifi-system-test-suite/target/troubleshooting/**/*
retention-days: 7
- name: Upload Surefire Report
if: failure()
uses: scacap/action-surefire-report@v1

View File

@ -67,6 +67,8 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceSearchValueDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ProcessorStatusSnapshotDTO;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
@ -95,6 +97,7 @@ import org.apache.nifi.web.api.entity.ParameterProviderParameterFetchEntity;
import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
@ -691,7 +694,8 @@ public class NiFiClientUtil {
return;
}
if (entity.getStatus().getAggregateSnapshot().getActiveThreadCount() == 0) {
final ProcessorStatusSnapshotDTO snapshotDto = entity.getStatus().getAggregateSnapshot();
if (snapshotDto.getActiveThreadCount() == 0 && snapshotDto.getTerminatedThreadCount() == 0) {
return;
}
@ -840,10 +844,49 @@ public class NiFiClientUtil {
scheduleComponentsEntity.setDisconnectedNodeAcknowledged(true);
final ScheduleComponentsEntity scheduleEntity = nifiClient.getFlowClient().scheduleProcessGroupComponents(groupId, scheduleComponentsEntity);
waitForProcessorsStopped(groupId);
waitForNoRunningComponents(groupId);
return scheduleEntity;
}
private void waitForNoRunningComponents(final String groupId) throws NiFiClientException, IOException {
while (true) {
final boolean anyRunning = isAnyComponentRunning(groupId);
if (!anyRunning) {
return;
}
try {
Thread.sleep(10L);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
}
private boolean isAnyComponentRunning(final String groupId) throws NiFiClientException, IOException {
final ProcessGroupStatusEntity statusEntity = nifiClient.getFlowClient().getProcessGroupStatus(groupId, false);
final ProcessGroupStatusSnapshotDTO snapshotDto = statusEntity.getProcessGroupStatus().getAggregateSnapshot();
final Integer activeThreadCount = snapshotDto.getActiveThreadCount();
if (activeThreadCount != null && activeThreadCount > 0) {
return true;
}
final Integer terminatedThreadCount = snapshotDto.getTerminatedThreadCount();
if (terminatedThreadCount != null && terminatedThreadCount > 0) {
return true;
}
final ProcessGroupEntity rootGroup = nifiClient.getProcessGroupClient().getProcessGroup(groupId);
final Integer runningCount = rootGroup.getRunningCount();
if (runningCount != null && runningCount > 0) {
return true;
}
return false;
}
private void waitForProcessorsStopped(final String groupId) throws IOException, NiFiClientException {
final ProcessGroupFlowEntity rootGroup = nifiClient.getFlowClient().getProcessGroup(groupId);
final FlowDTO rootFlowDTO = rootGroup.getProcessGroupFlow().getFlow();

View File

@ -51,7 +51,7 @@ public class BatchFlowBetweenGroupsIT extends NiFiSystemIT {
// Start input port for Group B
// Wait for all 5 FlowFiles to be ingested into Group B
// Wait for 5 additional FlowFiles to be queued between A and B
// Ensure that queue between A and B has 2 FlowFiles
// Ensure that queue Generate and Input Port A has 2 FlowFiles
// Start Output Port of Group B
// Wait for count from CountEvents to equal 25
@ -66,7 +66,7 @@ public class BatchFlowBetweenGroupsIT extends NiFiSystemIT {
getClientUtil().updateProcessorProperties(duplicate, Collections.singletonMap("Output Count", "5"));
final ProcessorEntity sleep = getClientUtil().createProcessor("Sleep", processGroupA.getId());
getClientUtil().updateProcessorProperties(sleep, Collections.singletonMap("onTrigger Sleep Time", "10 ms"));
getClientUtil().updateProcessorProperties(sleep, Collections.singletonMap("onTrigger Sleep Time", "2 sec"));
getClientUtil().createConnection(inputPortA, duplicate);
getClientUtil().createConnection(duplicate, sleep, "success");