NIFI-8458 TailFile - Fix some bugs. Add more tests.

- During cleanup keep "tailingPostRollover" in the updated state.
- Skipping tests that can't run on Windows.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Tamas Palfy 2021-04-22 16:40:59 +02:00 committed by Mark Payne
parent 90c7d03ed3
commit 16dc61e151
4 changed files with 556 additions and 23 deletions

View File

@ -585,7 +585,8 @@ public class TailFile extends AbstractProcessor {
for (TailFileObject tfo : states.values()) {
cleanReader(tfo);
final TailFileState state = tfo.getState();
tfo.setState(new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition(), state.getTimestamp(), state.getLength(), state.getChecksum(), state.getBuffer()));
tfo.setState(new TailFileState(state.getFilename(), state.getFile(), null, state.getPosition(),
state.getTimestamp(), state.getLength(), state.getChecksum(), state.getBuffer(), state.isTailingPostRollover()));
}
}
@ -824,24 +825,6 @@ public class TailFile extends AbstractProcessor {
}
});
if (abort.get() != null) {
session.remove(flowFile);
final long newPosition = positionHolder.get();
try {
reader.position(newPosition);
} catch (IOException ex) {
getLogger().warn("Couldn't reposition the reader for {} due to {}", new Object[]{ file, ex }, ex);
try {
reader.close();
} catch (IOException ex2) {
getLogger().warn("Failed to close reader for {} due to {}", new Object[]{ file, ex2 }, ex2);
}
reader = null;
}
tfo.setState(new TailFileState(tailFile, file, reader, newPosition, timestamp, length, checksum, state.getBuffer()));
throw abort.get();
}
// If there ended up being no data, just remove the FlowFile
if (flowFile.getSize() == 0) {
session.remove(flowFile);
@ -882,6 +865,22 @@ public class TailFile extends AbstractProcessor {
tfo.setState(new TailFileState(tailFile, file, reader, position, timestamp, length, checksum, state.getBuffer()));
persistState(tfo, session, context);
if (abort.get() != null) {
final long newPosition = positionHolder.get();
try {
reader.position(newPosition);
} catch (IOException ex) {
getLogger().warn("Couldn't reposition the reader for {} due to {}", new Object[]{ file, ex }, ex);
try {
reader.close();
} catch (IOException ex2) {
getLogger().warn("Failed to close reader for {} due to {}", new Object[]{ file, ex2 }, ex2);
}
}
throw abort.get();
}
}
private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum, Boolean reReadOnNul) throws IOException {
@ -1208,8 +1207,8 @@ public class TailFile extends AbstractProcessor {
final boolean tailFirstFile;
if (rolloverOccurred) {
final File firstFile = rolledOffFiles.get(0);
final long millisSinceModified = System.currentTimeMillis() - firstFile.lastModified();
final boolean fileGrew = firstFile.length() >= position && position > 0;
final long millisSinceModified = getCurrentTimeMs() - firstFile.lastModified();
final boolean fileGrew = firstFile.length() >= position;
final boolean tailRolledFile = postRolloverTailMillis == 0 || millisSinceModified < postRolloverTailMillis;
tailFirstFile = fileGrew && tailRolledFile && expectedChecksum != null;
} else {
@ -1244,7 +1243,7 @@ public class TailFile extends AbstractProcessor {
// If we don't notice that the file has been modified, per the checks above, then we want to keep checking until the last modified
// date has eclipsed the configured value for the Post-Rollover Tail Period. Until then, return false. Once that occurs, we will
// consume the rest of the data, including the last line, even if it doesn't have a line ending.
final long millisSinceModified = System.currentTimeMillis() - newestFile.lastModified();
final long millisSinceModified = getCurrentTimeMs() - newestFile.lastModified();
if (millisSinceModified < postRolloverTailMillis) {
getLogger().debug("Rolled over file {} (size={}, lastModified={}) was modified {} millis ago, which isn't long enough to consume file fully without taking line endings into " +
"account. Will do nothing will file for now.", newestFile, newestFile.length(), newestFile.lastModified(), millisSinceModified);
@ -1343,7 +1342,7 @@ public class TailFile extends AbstractProcessor {
// updated values.
// But if we are not going to tail the rolled over file for any period of time, we can essentially reset the state.
final long postRolloverTailMillis = context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
final long millisSinceUpdate = System.currentTimeMillis() - timestamp;
final long millisSinceUpdate = getCurrentTimeMs() - timestamp;
if (tailingPostRollover && postRolloverTailMillis > 0) {
getLogger().debug("File {} has been rolled over, but it was updated {} millis ago, which is less than the configured {} ({} ms), so will continue tailing",
fileToTail, millisSinceUpdate, POST_ROLLOVER_TAIL_PERIOD.getDisplayName(), postRolloverTailMillis);
@ -1410,6 +1409,10 @@ public class TailFile extends AbstractProcessor {
return tfo.getState();
}
public long getCurrentTimeMs() {
return System.currentTimeMillis();
}
static class TailFileObject {
private TailFileState state;

View File

@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class AbstractTestTailFileScenario {
public static final String TEST_DIRECTORY = "testTailFileScenario";
public static final String TARGET_FILE_PATH = "target/" + TEST_DIRECTORY + "/in.txt";
public static final String NUL_SUBSTITUTE = "X";
public static final Long POST_ROLLOVER_WAIT_PERSIOD_SECONDS = 100L;
protected File file;
protected RandomAccessFile randomAccessFile;
private TailFile processor;
protected TestRunner runner;
private AtomicBoolean stopAfterEachTrigger;
protected AtomicLong wordIndex;
protected AtomicLong rolloverIndex;
protected AtomicLong timeAdjustment;
protected AtomicBoolean rolloverSwitchPending;
protected LinkedList<Long> nulPositions;
protected List<String> expected;
private Random random;
@Before
public void setUp() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard", "TRACE");
clean();
File directory = new File("target/" + TEST_DIRECTORY);
if (!directory.exists()) {
assertTrue(directory.mkdirs());
}
createTargetFile();
randomAccessFile = new RandomAccessFile(file, "rw");
processor = new TailFile() {
@Override
public long getCurrentTimeMs() {
return super.getCurrentTimeMs() + timeAdjustment.get();
}
};
runner = TestRunners.newTestRunner(processor);
runner.setProperty(TailFile.FILENAME, TARGET_FILE_PATH);
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "in.txt*");
runner.setProperty(TailFile.REREAD_ON_NUL, "true");
runner.setProperty(TailFile.POST_ROLLOVER_TAIL_PERIOD, POST_ROLLOVER_WAIT_PERSIOD_SECONDS + " sec");
runner.assertValid();
runner.run(1, false, true);
stopAfterEachTrigger = new AtomicBoolean(false);
nulPositions = new LinkedList<>();
wordIndex = new AtomicLong(1);
rolloverIndex = new AtomicLong(1);
timeAdjustment = new AtomicLong(0);
rolloverSwitchPending = new AtomicBoolean(false);
expected = new ArrayList<>();
random = new Random();
}
@After
public void tearDown() throws IOException {
if (randomAccessFile != null) {
randomAccessFile.close();
}
processor.cleanup();
}
public void testScenario(List<Action> actions) throws Exception {
testScenario(actions, false);
tearDown();
setUp();
testScenario(actions, true);
}
public void testScenario(List<Action> actions, boolean stopAfterEachTrigger) throws Exception {
if (actions.contains(Action.ROLLOVER)) {
Assume.assumeTrue("Test wants to rename an open file which is not allowed on Windows", !SystemUtils.IS_OS_WINDOWS);
}
// GIVEN
this.stopAfterEachTrigger.set(stopAfterEachTrigger);
// WHEN
for (Action action : actions) {
action.run(this);
}
overwriteRemainingNuls();
Action.WRITE_NEW_LINE.run(this);
Action.TRIGGER.run(this);
Action.EXPIRE_ROLLOVER_WAIT_PERIOD.run(this);
Action.TRIGGER.run(this);
Action.TRIGGER.run(this);
// THEN
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS);
List<String> actual = flowFiles.stream()
.map(MockFlowFile::toByteArray)
.map(String::new)
.collect(Collectors.toList());
assertEquals(
stopAfterEachTrigger + " " + actions.toString(),
expected.stream().collect(Collectors.joining()),
actual.stream().collect(Collectors.joining())
);
}
private void clean() {
cleanFiles("target/" + TEST_DIRECTORY);
}
private void cleanFiles(String directory) {
final File targetDir = new File(directory);
if (targetDir.exists()) {
for (final File file : targetDir.listFiles()) {
file.delete();
}
}
}
private void createTargetFile() throws IOException {
file = new File(TARGET_FILE_PATH);
file.delete();
assertTrue(file.createNewFile());
}
private void overwriteRemainingNuls() throws Exception {
while (!nulPositions.isEmpty()) {
Action.OVERWRITE_NUL.run(this);
}
}
private void writeWord() throws IOException {
String word = "-word_" + wordIndex.getAndIncrement() + "-";
randomAccessFile.write(word.getBytes());
expected.add(word);
}
private void writeNewLine() throws IOException {
randomAccessFile.write("\n".getBytes());
expected.add("\n");
}
private void writeNul() throws IOException {
nulPositions.add(randomAccessFile.getFilePointer());
randomAccessFile.write("\0".getBytes());
expected.add(NUL_SUBSTITUTE);
}
private void overwriteNul() throws IOException {
if (!nulPositions.isEmpty()) {
Long nulPosition = nulPositions.remove(random.nextInt(nulPositions.size()));
long currentPosition = randomAccessFile.getFilePointer();
randomAccessFile.seek(nulPosition);
randomAccessFile.write(NUL_SUBSTITUTE.getBytes());
randomAccessFile.seek(currentPosition);
}
}
private void trigger() {
runner.run(1, stopAfterEachTrigger.get(), false);
}
private void rollover() throws IOException {
File rolledOverFile = new File(file.getParentFile(), file.getName() + "." + rolloverIndex.getAndIncrement());
file.renameTo(rolledOverFile);
createTargetFile();
rolloverSwitchPending.set(true);
}
private void switchFile() throws Exception {
if (rolloverSwitchPending.get()) {
overwriteRemainingNuls();
randomAccessFile.close();
randomAccessFile = new RandomAccessFile(file, "rw");
rolloverSwitchPending.set(false);
}
}
private void expireRolloverWaitPeriod() throws Exception {
long waitPeriod = POST_ROLLOVER_WAIT_PERSIOD_SECONDS * 1000 + 100;
timeAdjustment.set(timeAdjustment.get() + waitPeriod);
}
protected enum Action {
WRITE_WORD(AbstractTestTailFileScenario::writeWord),
WRITE_NEW_LINE(AbstractTestTailFileScenario::writeNewLine),
WRITE_NUL(AbstractTestTailFileScenario::writeNul),
OVERWRITE_NUL(AbstractTestTailFileScenario::overwriteNul),
TRIGGER(AbstractTestTailFileScenario::trigger),
ROLLOVER(AbstractTestTailFileScenario::rollover),
SWITCH_FILE(AbstractTestTailFileScenario::switchFile),
EXPIRE_ROLLOVER_WAIT_PERIOD(AbstractTestTailFileScenario::expireRolloverWaitPeriod);
private final ActionRunner actionRunner;
Action(ActionRunner actionRunner) {
this.actionRunner = actionRunner;
}
void run(AbstractTestTailFileScenario currentTest) throws Exception {
actionRunner.runAction(currentTest);
}
}
private interface ActionRunner {
void runAction(AbstractTestTailFileScenario currentTest) throws Exception;
}
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@Ignore("Stress test - longrunning. For manual testing.")
@RunWith(Parameterized.class)
public class TestTailFileGeneratedScenarios extends AbstractTestTailFileScenario {
private final List<Action> actions;
public TestTailFileGeneratedScenarios(List<Action> actions) {
this.actions = actions;
}
@Parameterized.Parameters
public static Collection parameters() {
Collection<Object[]> parameters = new ArrayList();
// Uncomment the portion for which to run the scenarios.
// They cannot be added to a single large batch because it opens too many files.
// List<Action> baseActions = Arrays.asList(
// Action.WRITE_WORD,
// Action.WRITE_NUL,
// Action.WRITE_NEW_LINE,
// Action.WRITE_NUL,
// Action.OVERWRITE_NUL,
// Action.WRITE_WORD,
// Action.OVERWRITE_NUL,
// Action.WRITE_NUL,
// Action.WRITE_NEW_LINE,
// Action.OVERWRITE_NUL,
// Action.WRITE_NUL,
// Action.OVERWRITE_NUL
// );
// addAction(parameters, Action.TRIGGER, baseActions, 0, 0);
// new ArrayList<>(parameters).forEach(anActionList -> addAction(parameters, Action.ROLLOVER, (List<Action>)anActionList[0], 0, 0));
// new ArrayList<>(parameters).forEach(anActionList -> addAction(parameters, Action.SWITCH_FILE, (List<Action>)anActionList[0], 0, 0));
// List<Action> baseActions = Arrays.asList(
// Action.WRITE_WORD,
// Action.WRITE_NEW_LINE,
// Action.WRITE_NUL,
// Action.OVERWRITE_NUL,
// Action.WRITE_WORD,
// Action.WRITE_NUL,
// Action.WRITE_NEW_LINE,
// Action.OVERWRITE_NUL,
// Action.ROLLOVER,
// Action.WRITE_WORD,
// Action.WRITE_NEW_LINE,
// Action.WRITE_NUL,
// Action.OVERWRITE_NUL,
// Action.WRITE_WORD,
// Action.WRITE_NUL,
// Action.WRITE_NEW_LINE,
// Action.OVERWRITE_NUL,
// Action.SWITCH_FILE,
// Action.WRITE_WORD,
// Action.WRITE_NEW_LINE,
// Action.WRITE_NUL,
// Action.OVERWRITE_NUL,
// Action.WRITE_WORD,
// Action.WRITE_NUL,
// Action.WRITE_NEW_LINE,
// Action.OVERWRITE_NUL
// );
// addAction(parameters, Action.TRIGGER, baseActions, 0, 1);
List<Action> baseActions = Arrays.asList(
Action.WRITE_WORD, Action.WRITE_WORD,
Action.WRITE_NEW_LINE, Action.WRITE_NEW_LINE,
Action.WRITE_NUL, Action.WRITE_NUL,
Action.WRITE_WORD, Action.WRITE_WORD,
Action.WRITE_NEW_LINE, Action.WRITE_NEW_LINE,
Action.OVERWRITE_NUL, Action.OVERWRITE_NUL,
Action.WRITE_NEW_LINE, Action.WRITE_NEW_LINE,
Action.WRITE_WORD, Action.WRITE_WORD,
Action.WRITE_NEW_LINE, Action.WRITE_NEW_LINE,
Action.ROLLOVER,
Action.WRITE_WORD, Action.WRITE_WORD,
Action.WRITE_NEW_LINE, Action.WRITE_NEW_LINE,
Action.WRITE_NUL, Action.WRITE_NUL,
Action.WRITE_WORD, Action.WRITE_WORD,
Action.WRITE_NEW_LINE, Action.WRITE_NEW_LINE,
Action.OVERWRITE_NUL, Action.OVERWRITE_NUL,
Action.WRITE_NEW_LINE, Action.WRITE_NEW_LINE,
Action.WRITE_WORD, Action.WRITE_WORD,
Action.WRITE_NEW_LINE, Action.WRITE_NEW_LINE,
Action.SWITCH_FILE,
Action.WRITE_WORD, Action.WRITE_WORD,
Action.WRITE_NEW_LINE, Action.WRITE_NEW_LINE,
Action.WRITE_NUL, Action.WRITE_NUL,
Action.WRITE_WORD, Action.WRITE_WORD,
Action.WRITE_NEW_LINE, Action.WRITE_NEW_LINE,
Action.OVERWRITE_NUL, Action.OVERWRITE_NUL,
Action.WRITE_NEW_LINE, Action.WRITE_NEW_LINE,
Action.WRITE_WORD, Action.WRITE_WORD,
Action.WRITE_NEW_LINE, Action.WRITE_NEW_LINE
);
addAction(parameters, Action.TRIGGER, baseActions, 0, 1);
return parameters;
}
private static void addAction(Collection<Object[]> parameters, Action action, List<Action> baseActions, int currentDepth, int recursiveDepth) {
for (int triggerIndex = 0; triggerIndex < baseActions.size(); triggerIndex++) {
List<Action> actions = new LinkedList<>(baseActions);
actions.add(triggerIndex, action);
parameters.add(new Object[]{ actions});
if (currentDepth < recursiveDepth) {
addAction(parameters, action, actions, currentDepth+1, recursiveDepth);
}
}
}
@Test
public void testScenario() throws Exception {
testScenario(actions);
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
public class TestTailFileSimpleScenarios extends AbstractTestTailFileScenario {
@Test
public void testSimpleScenario() throws Exception {
// GIVEN
List<Action> actions = Arrays.asList(
Action.WRITE_WORD, Action.WRITE_NEW_LINE,
Action.TRIGGER,
Action.ROLLOVER,
Action.TRIGGER,
Action.WRITE_WORD, Action.WRITE_NEW_LINE,
Action.WRITE_WORD, Action.WRITE_NUL,
Action.OVERWRITE_NUL, Action.WRITE_NEW_LINE,
Action.WRITE_WORD, Action.WRITE_NEW_LINE,
Action.SWITCH_FILE,
Action.WRITE_WORD, Action.WRITE_NEW_LINE
);
// WHEN
// THEN
testScenario(actions);
}
@Test
public void testSimpleScenario2() throws Exception {
// GIVEN
List<Action> actions = Arrays.asList(
Action.WRITE_WORD, Action.WRITE_NEW_LINE,
Action.WRITE_WORD,
Action.WRITE_NUL,
Action.TRIGGER,
Action.WRITE_WORD, Action.WRITE_NEW_LINE,
Action.OVERWRITE_NUL
);
// WHEN
// THEN
testScenario(actions);
}
@Test
public void testSimpleScenario3() throws Exception {
// GIVEN
List<Action> actions = Arrays.asList(
Action.WRITE_WORD,
Action.WRITE_NUL,
Action.TRIGGER,
Action.WRITE_WORD, Action.WRITE_NEW_LINE,
Action.OVERWRITE_NUL
);
// WHEN
// THEN
testScenario(actions);
}
@Test
public void testSimpleScenario4() throws Exception {
// GIVEN
List<Action> actions = Arrays.asList(
Action.WRITE_WORD, Action.WRITE_NEW_LINE,
Action.ROLLOVER,
Action.TRIGGER,
Action.WRITE_WORD, Action.WRITE_NEW_LINE,
Action.SWITCH_FILE,
Action.WRITE_WORD, Action.WRITE_NEW_LINE
);
// WHEN
// THEN
testScenario(actions);
}
@Test
public void testSimpleScenario5() throws Exception {
// GIVEN
List<Action> actions = Arrays.asList(
Action.WRITE_WORD, Action.WRITE_NEW_LINE,
Action.TRIGGER,
Action.WRITE_WORD, Action.WRITE_NEW_LINE,
Action.WRITE_NUL,
Action.TRIGGER,
Action.OVERWRITE_NUL,
Action.ROLLOVER,
Action.WRITE_WORD, Action.WRITE_NEW_LINE
);
// WHEN
// THEN
testScenario(actions);
}
}