NIFI-11902: Fix ListHDFS closes FileSystem in first run

This closes #7565.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Lehel Boer 2023-08-03 00:07:38 +02:00 committed by Peter Turcsanyi
parent 49a350a765
commit 6a8a8caa4c
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
1 changed files with 42 additions and 44 deletions

View File

@ -257,58 +257,56 @@ public class ListHDFS extends AbstractHadoopProcessor {
}
// Pull in any file that is newer than the timestamp that we have.
try (final FileSystem hdfs = getFileSystem()) {
final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
final PathFilter pathFilter = createPathFilter(context);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final FileSystem hdfs = getFileSystem();
final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
final PathFilter pathFilter = createPathFilter(context);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final FileStatusManager fileStatusManager = new FileStatusManager(latestTimestamp, latestFiles);
final Path rootPath = getNormalizedPath(context, DIRECTORY);
final FileStatusIterable fileStatusIterable = new FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
final FileStatusManager fileStatusManager = new FileStatusManager(latestTimestamp, latestFiles);
final Path rootPath = getNormalizedPath(context, DIRECTORY);
final FileStatusIterable fileStatusIterable = new FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
final Long minAgeProp = context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
final Long maxAgeProp = context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
final Long minAgeProp = context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
final Long maxAgeProp = context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
final HadoopFileStatusWriter writer = HadoopFileStatusWriter.builder()
.session(session)
.successRelationship(getSuccessRelationship())
.fileStatusIterable(fileStatusIterable)
.fileStatusManager(fileStatusManager)
.pathFilter(pathFilter)
.minimumAge(minimumAge)
.maximumAge(maximumAge)
.previousLatestTimestamp(latestTimestamp)
.previousLatestFiles(latestFiles)
.writerFactory(writerFactory)
.hdfsPrefix(getAttributePrefix())
.logger(getLogger())
.build();
final HadoopFileStatusWriter writer = HadoopFileStatusWriter.builder()
.session(session)
.successRelationship(getSuccessRelationship())
.fileStatusIterable(fileStatusIterable)
.fileStatusManager(fileStatusManager)
.pathFilter(pathFilter)
.minimumAge(minimumAge)
.maximumAge(maximumAge)
.previousLatestTimestamp(latestTimestamp)
.previousLatestFiles(latestFiles)
.writerFactory(writerFactory)
.hdfsPrefix(getAttributePrefix())
.logger(getLogger())
.build();
writer.write();
writer.write();
getLogger().debug("Found a total of {} files in HDFS, {} are listed", fileStatusIterable.getTotalFileCount(), writer.getListedFileCount());
getLogger().debug("Found a total of {} files in HDFS, {} are listed", fileStatusIterable.getTotalFileCount(), writer.getListedFileCount());
if (writer.getListedFileCount() > 0) {
final Map<String, String> updatedState = new HashMap<>();
updatedState.put(LATEST_TIMESTAMP_KEY, String.valueOf(fileStatusManager.getCurrentLatestTimestamp()));
final List<String> files = fileStatusManager.getCurrentLatestFiles();
for (int i = 0; i < files.size(); i++) {
final String currentFilePath = files.get(i);
updatedState.put(String.format(LATEST_FILES_KEY, i), currentFilePath);
}
getLogger().debug("New state map: {}", updatedState);
updateState(session, updatedState);
getLogger().info("Successfully created listing with {} new files from HDFS", writer.getListedFileCount());
} else {
getLogger().debug("There is no data to list. Yielding.");
context.yield();
if (writer.getListedFileCount() > 0) {
final Map<String, String> updatedState = new HashMap<>();
updatedState.put(LATEST_TIMESTAMP_KEY, String.valueOf(fileStatusManager.getCurrentLatestTimestamp()));
final List<String> files = fileStatusManager.getCurrentLatestFiles();
for (int i = 0; i < files.size(); i++) {
final String currentFilePath = files.get(i);
updatedState.put(String.format(LATEST_FILES_KEY, i), currentFilePath);
}
} catch (IOException e) {
throw new ProcessException("IO error occurred when closing HDFS file system", e);
getLogger().debug("New state map: {}", updatedState);
updateState(session, updatedState);
getLogger().info("Successfully created listing with {} new files from HDFS", writer.getListedFileCount());
} else {
getLogger().debug("There is no data to list. Yielding.");
context.yield();
}
}
private PathFilter createPathFilter(final ProcessContext context) {