mirror of https://github.com/apache/nifi.git
NIFI-3141: Fixed TailFile ArrayIndexOutOfBounds.
- Added unit test cases to simulate NiFi version update which fails without this fix. - Added state object migration code, add file.0. prefix to state keys, and add length from stored position. This closes #1289
This commit is contained in:
parent
afe742700c
commit
8da38acf31
|
@ -345,6 +345,25 @@ public class TailFile extends AbstractProcessor {
|
||||||
|
|
||||||
Map<String, String> statesMap = stateMap.toMap();
|
Map<String, String> statesMap = stateMap.toMap();
|
||||||
|
|
||||||
|
if (statesMap.containsKey(TailFileState.StateKeys.FILENAME)
|
||||||
|
&& !statesMap.keySet().stream().anyMatch(key -> key.startsWith(MAP_PREFIX))) {
|
||||||
|
// If statesMap contains "filename" key without "file.0." prefix,
|
||||||
|
// and there's no key with "file." prefix, then
|
||||||
|
// it indicates that the statesMap is created with earlier version of NiFi.
|
||||||
|
// In this case, we need to migrate the state by adding prefix indexed with 0.
|
||||||
|
final Map<String, String> migratedStatesMap = new HashMap<>(statesMap.size());
|
||||||
|
for (String key : statesMap.keySet()) {
|
||||||
|
migratedStatesMap.put(MAP_PREFIX + "0." + key, statesMap.get(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
// LENGTH is added from NiFi 1.1.0. Set the value with using the last position so that we can use existing state
|
||||||
|
// to avoid sending duplicated log data after updating NiFi.
|
||||||
|
migratedStatesMap.put(MAP_PREFIX + "0." + TailFileState.StateKeys.LENGTH, statesMap.get(TailFileState.StateKeys.POSITION));
|
||||||
|
statesMap = Collections.unmodifiableMap(migratedStatesMap);
|
||||||
|
|
||||||
|
getLogger().info("statesMap has been migrated. {}", new Object[]{migratedStatesMap});
|
||||||
|
}
|
||||||
|
|
||||||
initStates(filesToTail, statesMap, false);
|
initStates(filesToTail, statesMap, false);
|
||||||
recoverState(context, filesToTail, statesMap);
|
recoverState(context, filesToTail, statesMap);
|
||||||
}
|
}
|
||||||
|
@ -931,6 +950,15 @@ public class TailFile extends AbstractProcessor {
|
||||||
Map<String, String> updatedState = new HashMap<String, String>();
|
Map<String, String> updatedState = new HashMap<String, String>();
|
||||||
|
|
||||||
for(String key : oldState.toMap().keySet()) {
|
for(String key : oldState.toMap().keySet()) {
|
||||||
|
// These states are stored by older version of NiFi, and won't be used anymore.
|
||||||
|
// New states have 'file.<index>.' prefix.
|
||||||
|
if (TailFileState.StateKeys.CHECKSUM.equals(key)
|
||||||
|
|| TailFileState.StateKeys.FILENAME.equals(key)
|
||||||
|
|| TailFileState.StateKeys.POSITION.equals(key)
|
||||||
|
|| TailFileState.StateKeys.TIMESTAMP.equals(key)) {
|
||||||
|
getLogger().info("Removed state {}={} stored by older version of NiFi.", new Object[]{key, oldState.get(key)});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
updatedState.put(key, oldState.get(key));
|
updatedState.put(key, oldState.get(key));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,16 +18,26 @@ package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.BufferedWriter;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
import java.io.FilenameFilter;
|
import java.io.FilenameFilter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStreamWriter;
|
||||||
import java.io.RandomAccessFile;
|
import java.io.RandomAccessFile;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.nifi.components.state.Scope;
|
||||||
|
import org.apache.nifi.components.state.StateMap;
|
||||||
import org.apache.nifi.processors.standard.TailFile.TailFileState;
|
import org.apache.nifi.processors.standard.TailFile.TailFileState;
|
||||||
|
import org.apache.nifi.state.MockStateManager;
|
||||||
|
import org.apache.nifi.stream.io.ByteArrayOutputStream;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
@ -39,6 +49,7 @@ import org.junit.Test;
|
||||||
public class TestTailFile {
|
public class TestTailFile {
|
||||||
|
|
||||||
private File file;
|
private File file;
|
||||||
|
private File existingFile;
|
||||||
private File otherFile;
|
private File otherFile;
|
||||||
|
|
||||||
private RandomAccessFile raf;
|
private RandomAccessFile raf;
|
||||||
|
@ -56,6 +67,19 @@ public class TestTailFile {
|
||||||
file.delete();
|
file.delete();
|
||||||
assertTrue(file.createNewFile());
|
assertTrue(file.createNewFile());
|
||||||
|
|
||||||
|
existingFile = new File("target/existing-log.txt");
|
||||||
|
existingFile.delete();
|
||||||
|
assertTrue(existingFile.createNewFile());
|
||||||
|
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(existingFile)))) {
|
||||||
|
writer.write("Line 1");
|
||||||
|
writer.newLine();
|
||||||
|
writer.write("Line 2");
|
||||||
|
writer.newLine();
|
||||||
|
writer.write("Line 3");
|
||||||
|
writer.newLine();
|
||||||
|
writer.flush();
|
||||||
|
}
|
||||||
|
|
||||||
File directory = new File("target/testDir");
|
File directory = new File("target/testDir");
|
||||||
if(!directory.exists()) {
|
if(!directory.exists()) {
|
||||||
assertTrue(directory.mkdirs());
|
assertTrue(directory.mkdirs());
|
||||||
|
@ -812,6 +836,76 @@ public class TestTailFile {
|
||||||
runner.clearTransferState();
|
runner.clearTransferState();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMigrateFrom100To110() throws IOException {
|
||||||
|
|
||||||
|
runner.setProperty(TailFile.FILENAME, "target/existing-log.txt");
|
||||||
|
|
||||||
|
final MockStateManager stateManager = runner.getStateManager();
|
||||||
|
|
||||||
|
// Before NiFi 1.1.0, TailFile only handles single file
|
||||||
|
// and state key doesn't have index in it.
|
||||||
|
final Map<String, String> state = new HashMap<>();
|
||||||
|
state.put("filename", "target/existing-log.txt");
|
||||||
|
// Simulate that it has been tailed up to the 2nd line.
|
||||||
|
state.put("checksum", "2279929157");
|
||||||
|
state.put("position", "14");
|
||||||
|
state.put("timestamp", "1480639134000");
|
||||||
|
stateManager.setState(state, Scope.LOCAL);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
|
||||||
|
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).iterator().next();
|
||||||
|
|
||||||
|
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||||
|
try (final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(bos))) {
|
||||||
|
writer.write("Line 3");
|
||||||
|
writer.newLine();
|
||||||
|
}
|
||||||
|
|
||||||
|
flowFile.assertContentEquals(bos.toByteArray());
|
||||||
|
|
||||||
|
// The old states should be replaced with new ones.
|
||||||
|
final StateMap updatedState = stateManager.getState(Scope.LOCAL);
|
||||||
|
assertNull(updatedState.get("filename"));
|
||||||
|
assertNull(updatedState.get("checksum"));
|
||||||
|
assertNull(updatedState.get("position"));
|
||||||
|
assertNull(updatedState.get("timestamp"));
|
||||||
|
assertEquals("target/existing-log.txt", updatedState.get("file.0.filename"));
|
||||||
|
assertEquals("3380848603", updatedState.get("file.0.checksum"));
|
||||||
|
assertEquals("21", updatedState.get("file.0.position"));
|
||||||
|
assertNotNull(updatedState.get("file.0.timestamp"));
|
||||||
|
|
||||||
|
// When it runs again, the state is already migrated, so it shouldn't emit any flow files.
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMigrateFrom100To110FileNotFound() throws IOException {
|
||||||
|
|
||||||
|
runner.setProperty(TailFile.FILENAME, "target/not-existing-log.txt");
|
||||||
|
|
||||||
|
final MockStateManager stateManager = runner.getStateManager();
|
||||||
|
|
||||||
|
// Before NiFi 1.1.0, TailFile only handles single file
|
||||||
|
// and state key doesn't have index in it.
|
||||||
|
final Map<String, String> state = new HashMap<>();
|
||||||
|
state.put("filename", "target/not-existing-log.txt");
|
||||||
|
// Simulate that it has been tailed up to the 2nd line.
|
||||||
|
state.put("checksum", "2279929157");
|
||||||
|
state.put("position", "14");
|
||||||
|
state.put("timestamp", "1480639134000");
|
||||||
|
stateManager.setState(state, Scope.LOCAL);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(TailFile.REL_SUCCESS, 0);
|
||||||
|
}
|
||||||
|
|
||||||
private void cleanFiles(String directory) {
|
private void cleanFiles(String directory) {
|
||||||
final File targetDir = new File(directory);
|
final File targetDir = new File(directory);
|
||||||
if(targetDir.exists()) {
|
if(targetDir.exists()) {
|
||||||
|
|
Loading…
Reference in New Issue