diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/AsyncCommitCallbackIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/AsyncCommitCallbackIT.java index 2006a0ecd9..bf15b0629e 100644 --- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/AsyncCommitCallbackIT.java +++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/AsyncCommitCallbackIT.java @@ -177,7 +177,11 @@ public class AsyncCommitCallbackIT extends StatelessSystemIT { Thread.sleep(1000L); assertFalse(trigger.getResultNow().isPresent()); - Files.write(replacementFile.toPath(), "Good-bye World".getBytes(), StandardOpenOption.CREATE); + // Write to a temp file, then rename to the replacement file. Otherwise, the file may be created, and then the flow + // run before the data is written to the file, which can cause the FlowFile to be replaced with 0 bytes. + final File tempFile = new File(replacementFile.getParentFile(), "." + replacementFile.getName()); + Files.write(tempFile.toPath(), "Good-bye World".getBytes(), StandardOpenOption.CREATE); + assertTrue(tempFile.renameTo(replacementFile)); assertTrue(inputFile.exists()); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java index a5561e40bf..1daa76d15d 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java @@ -149,8 +149,20 @@ public class DynamicClassPathModificationIT extends NiFiSystemIT { getNifiClient().getProcessorClient().terminateProcessor(modify.getId()); getClientUtil().waitForStoppedProcessor(modify.getId()); - // Empty the queue and generate another FlowFile with a sleep of 0 sec + // Empty the queue. Because the processor was terminated, it may still hold the FlowFile for a bit until the terminate completes. + // Because of that we'll wait until the queue is empty, attempting to empty it if it is not. getClientUtil().emptyQueue(modifyInputConnection.getId()); + waitFor(() -> { + final int queueSize = getConnectionQueueSize(modifyInputConnection.getId()); + if (queueSize == 0) { + return true; + } + + getClientUtil().emptyQueue(modifyInputConnection.getId()); + return false; + }); + + // Generate another FlowFile with a sleep of 0 sec getClientUtil().stopProcessor(generate); getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("sleep", "0 sec")); getClientUtil().waitForValidProcessor(generate.getId());