diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 6c356430b2..6fe51951e3 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -36,9 +36,9 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -87,6 +87,7 @@ public class StandardProcessorTestRunner implements TestRunner { private int numThreads = 1; private MockSessionFactory sessionFactory; + private long runSchedule = 0; private final AtomicInteger invocations = new AtomicInteger(0); private final Map controllerServiceLoggers = new HashMap<>(); @@ -177,11 +178,11 @@ public class StandardProcessorTestRunner implements TestRunner { } } - final ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(numThreads); @SuppressWarnings("unchecked") final Future[] futures = new Future[iterations]; for (int i = 0; i < iterations; i++) { - final Future future = executorService.submit(new RunProcessor()); + final Future future = executorService.schedule(new RunProcessor(), i * runSchedule, TimeUnit.MILLISECONDS); futures[i] = future; } @@ -907,4 +908,15 @@ public class StandardProcessorTestRunner implements TestRunner { } } } + + /** + * Set the Run Schedule parameter (in milliseconds). If set, this will be the duration + * between two calls of the onTrigger method. + * + * @param runSchedule Run schedule duration in milliseconds. + */ + @Override + public void setRunSchedule(long runSchedule) { + this.runSchedule = runSchedule; + } } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index 9a1a10d242..85ef72c4fe 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -965,4 +965,12 @@ public interface TestRunner { */ void enforceReadStreamsClosed(boolean enforce); + /** + * Set the Run Schedule parameter (in milliseconds). If set, this will be the duration + * between two calls of the onTrigger method. + * + * @param runSchedule Run schedule duration in milliseconds. + */ + void setRunSchedule(long runSchedule); + } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java index 88b83b413e..e2f75bcae7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java @@ -44,11 +44,11 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.NullOutputStream; import org.apache.nifi.stream.io.StreamUtils; import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -108,8 +108,7 @@ public class TailFile extends AbstractProcessor { + " In this mode, the file may not exist when starting the processor."); static final AllowableValue MODE_MULTIFILE = new AllowableValue("Multiple files", "Multiple files", "In this mode, the 'Files to tail' property accepts a regular expression and the processor will look" - + " for files in 'Base directory' to list the files to tail by the processor. In this mode, only the files existing" - + " when starting the processor will be used."); + + " for files in 'Base directory' to list the files to tail by the processor."); static final AllowableValue FIXED_NAME = new AllowableValue("Fixed name", "Fixed name", "With this rolling strategy, the files " + "where the log messages are appended have always the same name."); @@ -311,8 +310,7 @@ public class TailFile extends AbstractProcessor { @OnScheduled public void recoverState(final ProcessContext context) throws IOException { // set isMultiChanging - isMultiChanging.set(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue()) - && context.getProperty(ROLLING_STRATEGY).getValue().equals(CHANGING_NAME.getValue())); + isMultiChanging.set(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())); // set last lookup to now lastLookup.set(new Date().getTime()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java index 227c72702a..485b8e3d14 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java @@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.FilenameFilter; @@ -31,13 +33,15 @@ import java.io.RandomAccessFile; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.processors.standard.TailFile.TailFileState; import org.apache.nifi.state.MockStateManager; -import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -659,6 +663,38 @@ public class TestTailFile { assertTrue(runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).stream().anyMatch(mockFlowFile -> mockFlowFile.isContentEqual("1\n"))); } + @Test + public void testDetectNewFile() throws IOException, InterruptedException { + runner.setProperty(TailFile.BASE_DIRECTORY, "target"); + runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE); + runner.setProperty(TailFile.LOOKUP_FREQUENCY, "1 sec"); + runner.setProperty(TailFile.FILENAME, "log_[0-9]*\\.txt"); + runner.setProperty(TailFile.RECURSIVE, "false"); + runner.setProperty(TailFile.ROLLING_STRATEGY, TailFile.FIXED_NAME); + + initializeFile("target/log_1.txt", "firstLine\n"); + + Runnable task = () -> { + try { + initializeFile("target/log_2.txt", "newFile\n"); + } catch (Exception e) { + fail(); + } + }; + + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + executor.schedule(task, 2, TimeUnit.SECONDS); + + runner.setRunSchedule(2000); + runner.run(3); + + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2); + assertTrue(runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).stream().anyMatch(mockFlowFile -> mockFlowFile.isContentEqual("firstLine\n"))); + assertTrue(runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).stream().anyMatch(mockFlowFile -> mockFlowFile.isContentEqual("newFile\n"))); + + runner.shutdown(); + } + @Test public void testMultipleFilesWithBasedirAndFilenameEL() throws IOException, InterruptedException { runner.setVariable("vrBaseDirectory", "target"); @@ -933,4 +969,16 @@ public class TestTailFile { cleanFiles("target/testDir"); } + private RandomAccessFile initializeFile(String path, String data) throws IOException { + File file = new File(path); + if(file.exists()) { + file.delete(); + } + assertTrue(file.createNewFile()); + RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); + randomAccessFile.write(data.getBytes()); + randomAccessFile.close(); + return randomAccessFile; + } + }