NIFI-5000 - ListHDFS properly lists files from updated directory path

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2576.
This commit is contained in:
zenfenan 2018-03-22 22:43:52 +05:30 committed by Pierre Villard
parent ac9944ccee
commit c118e96238
2 changed files with 34 additions and 5 deletions

View File

@ -143,7 +143,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
private volatile long latestTimestampListed = -1L; private volatile long latestTimestampListed = -1L;
private volatile long latestTimestampEmitted = -1L; private volatile long latestTimestampEmitted = -1L;
private volatile long lastRunTimestamp = -1L; private volatile long lastRunTimestamp = -1L;
private volatile boolean resetState = false;
static final String LISTING_TIMESTAMP_KEY = "listing.timestamp"; static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp"; static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
@ -202,8 +202,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue); super.onPropertyModified(descriptor, oldValue, newValue);
if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) { if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) {
latestTimestampEmitted = -1L; this.resetState = true;
latestTimestampListed = -1L;
} }
} }
@ -283,8 +282,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
return toList; return toList;
} }
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
// We have to ensure that we don't continually perform listings, because if we perform two listings within // We have to ensure that we don't continually perform listings, because if we perform two listings within
@ -302,6 +299,12 @@ public class ListHDFS extends AbstractHadoopProcessor {
// Ensure that we are using the latest listing information before we try to perform a listing of HDFS files. // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files.
try { try {
if (resetState) {
getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L");
context.getStateManager().clear(Scope.CLUSTER);
this.resetState = false;
}
final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER); final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
if (stateMap.getVersion() == -1L) { if (stateMap.getVersion() == -1L) {
latestTimestampEmitted = -1L; latestTimestampEmitted = -1L;
@ -464,4 +467,5 @@ public class ListHDFS extends AbstractHadoopProcessor {
} }
}; };
} }
} }

View File

@ -381,6 +381,31 @@ public class TestListHDFS {
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2); runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
} }
@Test
public void testListAfterDirectoryChange() throws InterruptedException {
proc.fileSystem.addFileStatus(new Path("/test1"), new FileStatus(1L, false, 1, 1L, 100L,0L, create777(), "owner", "group", new Path("/test1/testFile-1_1.txt")));
proc.fileSystem.addFileStatus(new Path("/test2"), new FileStatus(1L, false, 1, 1L, 150L,0L, create777(), "owner", "group", new Path("/test2/testFile-2_1.txt")));
proc.fileSystem.addFileStatus(new Path("/test1"), new FileStatus(1L, false, 1, 1L, 200L,0L, create777(), "owner", "group", new Path("/test1/testFile-1_2.txt")));
runner.setProperty(ListHDFS.DIRECTORY, "/test1");
runner.run(); // Initial run, latest file from /test1 will be ignored
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run(); // Latest file i.e. testFile-1_2.txt from /test1 should also be picked up now
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
runner.setProperty(ListHDFS.DIRECTORY, "/test2"); // Changing directory should reset the state
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run(); // Will ignore the files for this cycle
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run(); // Since state has been reset, testFile-2_1.txt from /test2 should be picked up
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
}
private FsPermission create777() { private FsPermission create777() {
return new FsPermission((short) 0777); return new FsPermission((short) 0777);