NIFI-11591 Improved stability of DynamicallyClassPath and AsyncCommitCallback tests

After terminating Processor in DynamicallyModifiedClasspathIT system test, keep trying to empty queue until successful instead of assuming success after the first iteration.

When writing to a replacement file in AsyncCommitCallbackIT, write to a temp file and then rename. This prevents the processor from picking up the file while it's still being written.

This closes #7302

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2023-05-25 16:10:08 -04:00 committed by exceptionfactory
parent 1dfd85b40b
commit 9f61b5f957
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 18 additions and 2 deletions

View File

@ -177,7 +177,11 @@ public class AsyncCommitCallbackIT extends StatelessSystemIT {
Thread.sleep(1000L);
assertFalse(trigger.getResultNow().isPresent());
Files.write(replacementFile.toPath(), "Good-bye World".getBytes(), StandardOpenOption.CREATE);
// Write to a temp file, then rename to the replacement file. Otherwise, the file may be created, and then the flow
// run before the data is written to the file, which can cause the FlowFile to be replaced with 0 bytes.
final File tempFile = new File(replacementFile.getParentFile(), "." + replacementFile.getName());
Files.write(tempFile.toPath(), "Good-bye World".getBytes(), StandardOpenOption.CREATE);
assertTrue(tempFile.renameTo(replacementFile));
assertTrue(inputFile.exists());

View File

@ -149,8 +149,20 @@ public class DynamicClassPathModificationIT extends NiFiSystemIT {
getNifiClient().getProcessorClient().terminateProcessor(modify.getId());
getClientUtil().waitForStoppedProcessor(modify.getId());
// Empty the queue and generate another FlowFile with a sleep of 0 sec
// Empty the queue. Because the processor was terminated, it may still hold the FlowFile for a bit until the terminate completes.
// Because of that we'll wait until the queue is empty, attempting to empty it if it is not.
getClientUtil().emptyQueue(modifyInputConnection.getId());
waitFor(() -> {
final int queueSize = getConnectionQueueSize(modifyInputConnection.getId());
if (queueSize == 0) {
return true;
}
getClientUtil().emptyQueue(modifyInputConnection.getId());
return false;
});
// Generate another FlowFile with a sleep of 0 sec
getClientUtil().stopProcessor(generate);
getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("sleep", "0 sec"));
getClientUtil().waitForValidProcessor(generate.getId());