From 9f61b5f957e0389d4929fd7f4e9fd83521bcd4c5 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 25 May 2023 16:10:08 -0400 Subject: [PATCH] NIFI-11591 Improved stability of DynamicallyClassPath and AsyncCommitCallback tests After terminating Processor in DynamicallyModifiedClasspathIT system test, keep trying to empty queue until successful instead of assuming success after the first iteration. When writing to a replacement file in AsyncCommitCallbackIT, write to a temp file and then rename. This prevents the processor from picking up the file while it's still being written. This closes #7302 Signed-off-by: David Handermann --- .../stateless/basics/AsyncCommitCallbackIT.java | 6 +++++- .../processor/DynamicClassPathModificationIT.java | 14 +++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/AsyncCommitCallbackIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/AsyncCommitCallbackIT.java index 2006a0ecd9..bf15b0629e 100644 --- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/AsyncCommitCallbackIT.java +++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/AsyncCommitCallbackIT.java @@ -177,7 +177,11 @@ public class AsyncCommitCallbackIT extends StatelessSystemIT { Thread.sleep(1000L); assertFalse(trigger.getResultNow().isPresent()); - Files.write(replacementFile.toPath(), "Good-bye World".getBytes(), StandardOpenOption.CREATE); + // Write to a temp file, then rename to the replacement file. Otherwise, the file may be created, and then the flow + // run before the data is written to the file, which can cause the FlowFile to be replaced with 0 bytes. + final File tempFile = new File(replacementFile.getParentFile(), "." + replacementFile.getName()); + Files.write(tempFile.toPath(), "Good-bye World".getBytes(), StandardOpenOption.CREATE); + assertTrue(tempFile.renameTo(replacementFile)); assertTrue(inputFile.exists()); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java index a5561e40bf..1daa76d15d 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/DynamicClassPathModificationIT.java @@ -149,8 +149,20 @@ public class DynamicClassPathModificationIT extends NiFiSystemIT { getNifiClient().getProcessorClient().terminateProcessor(modify.getId()); getClientUtil().waitForStoppedProcessor(modify.getId()); - // Empty the queue and generate another FlowFile with a sleep of 0 sec + // Empty the queue. Because the processor was terminated, it may still hold the FlowFile for a bit until the terminate completes. + // Because of that we'll wait until the queue is empty, attempting to empty it if it is not. getClientUtil().emptyQueue(modifyInputConnection.getId()); + waitFor(() -> { + final int queueSize = getConnectionQueueSize(modifyInputConnection.getId()); + if (queueSize == 0) { + return true; + } + + getClientUtil().emptyQueue(modifyInputConnection.getId()); + return false; + }); + + // Generate another FlowFile with a sleep of 0 sec getClientUtil().stopProcessor(generate); getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("sleep", "0 sec")); getClientUtil().waitForValidProcessor(generate.getId());