NIFI-3607 Allow multi files mode with fixed names

This closes: #1608

Signed-off-by: Andre F de Miranda <trixpan@users.noreply.github.com>
This commit is contained in:
Pierre Villard 2017-03-20 15:28:27 +01:00 committed by Andre F de Miranda
parent 0054a9e35f
commit 7df5c2dc89
4 changed files with 75 additions and 9 deletions

View File

@ -36,9 +36,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -87,6 +87,7 @@ public class StandardProcessorTestRunner implements TestRunner {
private int numThreads = 1;
private MockSessionFactory sessionFactory;
private long runSchedule = 0;
private final AtomicInteger invocations = new AtomicInteger(0);
private final Map<String, MockComponentLog> controllerServiceLoggers = new HashMap<>();
@ -177,11 +178,11 @@ public class StandardProcessorTestRunner implements TestRunner {
}
}
final ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(numThreads);
@SuppressWarnings("unchecked")
final Future<Throwable>[] futures = new Future[iterations];
for (int i = 0; i < iterations; i++) {
final Future<Throwable> future = executorService.submit(new RunProcessor());
final Future<Throwable> future = executorService.schedule(new RunProcessor(), i * runSchedule, TimeUnit.MILLISECONDS);
futures[i] = future;
}
@ -907,4 +908,15 @@ public class StandardProcessorTestRunner implements TestRunner {
}
}
}
/**
* Set the Run Schedule parameter (in milliseconds). If set, this will be the duration
* between two calls of the onTrigger method.
*
* @param runSchedule Run schedule duration in milliseconds.
*/
@Override
public void setRunSchedule(long runSchedule) {
this.runSchedule = runSchedule;
}
}

View File

@ -965,4 +965,12 @@ public interface TestRunner {
*/
void enforceReadStreamsClosed(boolean enforce);
/**
* Set the Run Schedule parameter (in milliseconds). If set, this will be the duration
* between two calls of the onTrigger method.
*
* @param runSchedule Run schedule duration in milliseconds.
*/
void setRunSchedule(long runSchedule);
}

View File

@ -44,11 +44,11 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@ -108,8 +108,7 @@ public class TailFile extends AbstractProcessor {
+ " In this mode, the file may not exist when starting the processor.");
static final AllowableValue MODE_MULTIFILE = new AllowableValue("Multiple files", "Multiple files",
"In this mode, the 'Files to tail' property accepts a regular expression and the processor will look"
+ " for files in 'Base directory' to list the files to tail by the processor. In this mode, only the files existing"
+ " when starting the processor will be used.");
+ " for files in 'Base directory' to list the files to tail by the processor.");
static final AllowableValue FIXED_NAME = new AllowableValue("Fixed name", "Fixed name", "With this rolling strategy, the files "
+ "where the log messages are appended have always the same name.");
@ -311,8 +310,7 @@ public class TailFile extends AbstractProcessor {
@OnScheduled
public void recoverState(final ProcessContext context) throws IOException {
// set isMultiChanging
isMultiChanging.set(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())
&& context.getProperty(ROLLING_STRATEGY).getValue().equals(CHANGING_NAME.getValue()));
isMultiChanging.set(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue()));
// set last lookup to now
lastLookup.set(new Date().getTime());

View File

@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
@ -31,13 +33,15 @@ import java.io.RandomAccessFile;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.processors.standard.TailFile.TailFileState;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -659,6 +663,38 @@ public class TestTailFile {
assertTrue(runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).stream().anyMatch(mockFlowFile -> mockFlowFile.isContentEqual("1\n")));
}
@Test
public void testDetectNewFile() throws IOException, InterruptedException {
runner.setProperty(TailFile.BASE_DIRECTORY, "target");
runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE);
runner.setProperty(TailFile.LOOKUP_FREQUENCY, "1 sec");
runner.setProperty(TailFile.FILENAME, "log_[0-9]*\\.txt");
runner.setProperty(TailFile.RECURSIVE, "false");
runner.setProperty(TailFile.ROLLING_STRATEGY, TailFile.FIXED_NAME);
initializeFile("target/log_1.txt", "firstLine\n");
Runnable task = () -> {
try {
initializeFile("target/log_2.txt", "newFile\n");
} catch (Exception e) {
fail();
}
};
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.schedule(task, 2, TimeUnit.SECONDS);
runner.setRunSchedule(2000);
runner.run(3);
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
assertTrue(runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).stream().anyMatch(mockFlowFile -> mockFlowFile.isContentEqual("firstLine\n")));
assertTrue(runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).stream().anyMatch(mockFlowFile -> mockFlowFile.isContentEqual("newFile\n")));
runner.shutdown();
}
@Test
public void testMultipleFilesWithBasedirAndFilenameEL() throws IOException, InterruptedException {
runner.setVariable("vrBaseDirectory", "target");
@ -933,4 +969,16 @@ public class TestTailFile {
cleanFiles("target/testDir");
}
private RandomAccessFile initializeFile(String path, String data) throws IOException {
File file = new File(path);
if(file.exists()) {
file.delete();
}
assertTrue(file.createNewFile());
RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
randomAccessFile.write(data.getBytes());
randomAccessFile.close();
return randomAccessFile;
}
}