mirror of https://github.com/apache/nifi.git
NIFI-4747 - Removed directory existence check in GetHDFS
This closes #2391 Signed-off-by: Jeremy Dyer <jeremydyer@apache.org>
This commit is contained in:
parent
6e7dfb9935
commit
dc67bd2fdd
|
@ -238,11 +238,6 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
abstractOnScheduled(context);
|
abstractOnScheduled(context);
|
||||||
// copy configuration values to pass them around cleanly
|
// copy configuration values to pass them around cleanly
|
||||||
processorConfig = new ProcessorConfiguration(context);
|
processorConfig = new ProcessorConfiguration(context);
|
||||||
final FileSystem fs = getFileSystem();
|
|
||||||
final Path dir = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
|
|
||||||
if (!fs.exists(dir)) {
|
|
||||||
throw new IOException("PropertyDescriptor " + DIRECTORY + " has invalid value " + dir + ". The directory does not exist.");
|
|
||||||
}
|
|
||||||
|
|
||||||
// forget the state of the queue in case HDFS contents changed while this processor was turned off
|
// forget the state of the queue in case HDFS contents changed while this processor was turned off
|
||||||
queueLock.lock();
|
queueLock.lock();
|
||||||
|
@ -422,8 +417,16 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock()) {
|
if (System.currentTimeMillis() >= nextPollTime && listingLock.tryLock()) {
|
||||||
try {
|
try {
|
||||||
final FileSystem hdfs = getFileSystem();
|
final FileSystem hdfs = getFileSystem();
|
||||||
|
final Path directoryPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
|
||||||
|
|
||||||
|
if (!hdfs.exists(directoryPath)) {
|
||||||
|
context.yield();
|
||||||
|
getLogger().warn("The directory {} does not exist.", new Object[]{directoryPath});
|
||||||
|
} else {
|
||||||
// get listing
|
// get listing
|
||||||
listing = selectFiles(hdfs, new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()), null);
|
listing = selectFiles(hdfs, directoryPath, null);
|
||||||
|
}
|
||||||
|
|
||||||
lastPollTime.set(System.currentTimeMillis());
|
lastPollTime.set(System.currentTimeMillis());
|
||||||
} finally {
|
} finally {
|
||||||
listingLock.unlock();
|
listingLock.unlock();
|
||||||
|
@ -447,10 +450,6 @@ public class GetHDFS extends AbstractHadoopProcessor {
|
||||||
filesVisited = new HashSet<>();
|
filesVisited = new HashSet<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!hdfs.exists(dir)) {
|
|
||||||
throw new IOException("Selection directory " + dir.toString() + " doesn't appear to exist!");
|
|
||||||
}
|
|
||||||
|
|
||||||
final Set<Path> files = new HashSet<>();
|
final Set<Path> files = new HashSet<>();
|
||||||
|
|
||||||
FileStatus[] fileStatuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(dir));
|
FileStatus[] fileStatuses = getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(dir));
|
||||||
|
|
|
@ -142,6 +142,17 @@ public class GetHDFSTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDirectoryDoesNotExist() {
|
||||||
|
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||||
|
runner.setProperty(PutHDFS.DIRECTORY, "does/not/exist/${now():format('yyyyMMdd')}");
|
||||||
|
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
|
||||||
|
runner.run();
|
||||||
|
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
|
||||||
|
assertEquals(0, flowFiles.size());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAutomaticDecompression() throws IOException {
|
public void testAutomaticDecompression() throws IOException {
|
||||||
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
|
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
|
||||||
|
|
Loading…
Reference in New Issue