NIFI-11817: Improve ListHDFS extensibility

Fix review items

This closes #7484
Signed-off-by: Bence Simon <bsimon@apache.org>
This commit is contained in:
Lehel 2023-07-17 10:51:21 +02:00 committed by Bence Simon
parent bdd2a2b24b
commit a3fe9d121d
4 changed files with 249 additions and 68 deletions

View File

@ -46,9 +46,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
import org.apache.nifi.processors.hadoop.util.FileStatusManager;
import org.apache.nifi.processors.hadoop.util.FilterMode;
import org.apache.nifi.processors.hadoop.util.writer.FlowFileObjectWriter;
import org.apache.nifi.processors.hadoop.util.writer.HadoopFileStatusWriter;
import org.apache.nifi.processors.hadoop.util.writer.RecordObjectWriter;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.serialization.RecordSetWriterFactory;
@ -100,6 +98,7 @@ import static org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_DIRECTORI
public class ListHDFS extends AbstractHadoopProcessor {
private static final String NON_HIDDEN_FILES_REGEX = "[^\\.].*";
private static final String HDFS_ATTRIBUTE_PREFIX = "hdfs";
public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
.name("Recurse Subdirectories")
@ -163,6 +162,8 @@ public class ListHDFS extends AbstractHadoopProcessor {
public static final String LATEST_TIMESTAMP_KEY = "latest.timestamp";
public static final String LATEST_FILES_KEY = "latest.file.%d";
private static final List<PropertyDescriptor> LIST_HDFS_PROPERTIES = Arrays.asList(
DIRECTORY, RECURSE_SUBDIRS, RECORD_WRITER, FILE_FILTER, FILE_FILTER_MODE, MINIMUM_FILE_AGE, MAXIMUM_FILE_AGE);
private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
private Pattern fileFilterRegexPattern;
private volatile boolean resetState = false;
@ -177,7 +178,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> props = new ArrayList<>(properties);
props.addAll(Arrays.asList(DIRECTORY, RECURSE_SUBDIRS, RECORD_WRITER, FILE_FILTER, FILE_FILTER_MODE, MINIMUM_FILE_AGE, MAXIMUM_FILE_AGE));
props.addAll(LIST_HDFS_PROPERTIES);
return props;
}
@ -263,24 +264,31 @@ public class ListHDFS extends AbstractHadoopProcessor {
final FileStatusManager fileStatusManager = new FileStatusManager(latestTimestamp, latestFiles);
final Path rootPath = getNormalizedPath(context, DIRECTORY);
final FileStatusIterable fileStatuses = new FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
final FileStatusIterable fileStatusIterable = new FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
final Long minAgeProp = context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp;
final Long maxAgeProp = context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp;
final HadoopFileStatusWriter writer;
if (writerFactory == null) {
writer = new FlowFileObjectWriter(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp, latestFiles);
} else {
writer = new RecordObjectWriter(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp,
latestFiles, writerFactory, getLogger());
}
final HadoopFileStatusWriter writer = HadoopFileStatusWriter.builder()
.session(session)
.successRelationship(getSuccessRelationship())
.fileStatusIterable(fileStatusIterable)
.fileStatusManager(fileStatusManager)
.pathFilter(pathFilter)
.minimumAge(minimumAge)
.maximumAge(maximumAge)
.previousLatestTimestamp(latestTimestamp)
.previousLatestFiles(latestFiles)
.writerFactory(writerFactory)
.hdfsPrefix(getAttributePrefix())
.logger(getLogger())
.build();
writer.write();
getLogger().debug("Found a total of {} files in HDFS, {} are listed", fileStatuses.getTotalFileCount(), writer.getListedFileCount());
getLogger().debug("Found a total of {} files in HDFS, {} are listed", fileStatusIterable.getTotalFileCount(), writer.getListedFileCount());
if (writer.getListedFileCount() > 0) {
final Map<String, String> updatedState = new HashMap<>();
@ -335,4 +343,12 @@ public class ListHDFS extends AbstractHadoopProcessor {
getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", e);
}
}
protected Relationship getSuccessRelationship() {
return REL_SUCCESS;
}
protected String getAttributePrefix() {
return HDFS_ATTRIBUTE_PREFIX;
}
}

View File

@ -20,28 +20,31 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.PathFilter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.hadoop.ListHDFS;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
import org.apache.nifi.processors.hadoop.util.FileStatusManager;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class FlowFileObjectWriter extends HadoopFileStatusWriter {
private static final String HDFS_ATTRIBUTE_PREFIX = "hdfs";
public FlowFileObjectWriter(final ProcessSession session,
final FileStatusIterable fileStatuses,
public class FlowFileHadoopFileStatusWriter extends HadoopFileStatusWriter {
public FlowFileHadoopFileStatusWriter(final ProcessSession session,
final Relationship successRelationship,
final FileStatusIterable fileStatusIterable,
final FileStatusManager fileStatusManager,
final PathFilter pathFilter,
final long minimumAge,
final long maximumAge,
final PathFilter pathFilter,
final FileStatusManager fileStatusManager,
final long previousLatestModificationTime,
final List<String> previousLatestFiles) {
super(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, previousLatestModificationTime, previousLatestFiles);
final long previousLatestTimestamp,
final List<String> previousLatestFiles,
final RecordSetWriterFactory writerFactory,
final String hdfsPrefix,
final ComponentLog logger) {
super(session, successRelationship, fileStatusIterable, fileStatusManager, pathFilter, minimumAge, maximumAge, previousLatestTimestamp, previousLatestFiles, writerFactory, hdfsPrefix, logger);
}
@Override
@ -52,7 +55,7 @@ public class FlowFileObjectWriter extends HadoopFileStatusWriter {
final Map<String, String> attributes = createAttributes(status);
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, ListHDFS.REL_SUCCESS);
session.transfer(flowFile, successRelationship);
fileStatusManager.update(status);
fileCount++;
@ -64,12 +67,12 @@ public class FlowFileObjectWriter extends HadoopFileStatusWriter {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName());
attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent()));
attributes.put(HDFS_ATTRIBUTE_PREFIX + ".owner", status.getOwner());
attributes.put(HDFS_ATTRIBUTE_PREFIX + ".group", status.getGroup());
attributes.put(HDFS_ATTRIBUTE_PREFIX + ".lastModified", String.valueOf(status.getModificationTime()));
attributes.put(HDFS_ATTRIBUTE_PREFIX + ".length", String.valueOf(status.getLen()));
attributes.put(HDFS_ATTRIBUTE_PREFIX + ".replication", String.valueOf(status.getReplication()));
attributes.put(HDFS_ATTRIBUTE_PREFIX + ".permissions", getPermissionsString(status.getPermission()));
attributes.put(hdfsPrefix + ".owner", status.getOwner());
attributes.put(hdfsPrefix + ".group", status.getGroup());
attributes.put(hdfsPrefix + ".lastModified", String.valueOf(status.getModificationTime()));
attributes.put(hdfsPrefix + ".length", String.valueOf(status.getLen()));
attributes.put(hdfsPrefix + ".replication", String.valueOf(status.getReplication()));
attributes.put(hdfsPrefix + ".permissions", getPermissionsString(status.getPermission()));
return attributes;
}
}

View File

@ -21,9 +21,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
import org.apache.nifi.processors.hadoop.util.FileStatusManager;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import java.util.List;
@ -33,33 +36,45 @@ import java.util.List;
public abstract class HadoopFileStatusWriter {
protected final ProcessSession session;
protected final Relationship successRelationship;
protected final FileStatusIterable fileStatusIterable;
protected final FileStatusManager fileStatusManager;
protected final PathFilter pathFilter;
protected final long minimumAge;
protected final long maximumAge;
protected final PathFilter pathFilter;
protected final FileStatusManager fileStatusManager;
protected final long previousLatestTimestamp;
protected final List<String> previousLatestFiles;
protected final RecordSetWriterFactory writerFactory;
protected final String hdfsPrefix;
protected final ComponentLog logger;
protected final long currentTimeMillis;
protected long fileCount;
private final long currentTimeMillis;
HadoopFileStatusWriter(final ProcessSession session,
public HadoopFileStatusWriter(final ProcessSession session,
final Relationship successRelationship,
final FileStatusIterable fileStatusIterable,
final FileStatusManager fileStatusManager,
final PathFilter pathFilter,
final long minimumAge,
final long maximumAge,
final PathFilter pathFilter,
final FileStatusManager fileStatusManager,
final long previousLatestTimestamp,
final List<String> previousLatestFiles) {
final List<String> previousLatestFiles,
final RecordSetWriterFactory writerFactory,
final String hdfsPrefix,
final ComponentLog logger) {
this.session = session;
this.successRelationship = successRelationship;
this.fileStatusIterable = fileStatusIterable;
this.fileStatusManager = fileStatusManager;
this.pathFilter = pathFilter;
this.minimumAge = minimumAge;
this.maximumAge = maximumAge;
this.pathFilter = pathFilter;
this.fileStatusManager = fileStatusManager;
this.previousLatestTimestamp = previousLatestTimestamp;
this.previousLatestFiles = previousLatestFiles;
this.writerFactory = writerFactory;
this.hdfsPrefix = hdfsPrefix;
this.logger = logger;
currentTimeMillis = System.currentTimeMillis();
fileCount = 0L;
}
@ -110,4 +125,152 @@ public abstract class HadoopFileStatusWriter {
return String.format("%s%s%s", getPerms(permission.getUserAction()),
getPerms(permission.getGroupAction()), getPerms(permission.getOtherAction()));
}
public static HadoopFileStatusWriter.Builder builder() {
return new HadoopFileStatusWriter.Builder();
}
public static class Builder {
private ProcessSession session;
private FileStatusIterable fileStatusIterable;
private long minimumAge;
private long maximumAge;
private PathFilter pathFilter;
private FileStatusManager fileStatusManager;
private long previousLatestTimestamp;
private List<String> previousLatestFiles;
private Relationship successRelationship;
private RecordSetWriterFactory writerFactory;
private ComponentLog logger;
private String hdfsPrefix;
public HadoopFileStatusWriter.Builder session(final ProcessSession session) {
this.session = session;
return this;
}
public HadoopFileStatusWriter.Builder fileStatusIterable(final FileStatusIterable fileStatusIterable) {
this.fileStatusIterable = fileStatusIterable;
return this;
}
public HadoopFileStatusWriter.Builder minimumAge(final long minimumAge) {
this.minimumAge = minimumAge;
return this;
}
public HadoopFileStatusWriter.Builder maximumAge(final long maximumAge) {
this.maximumAge = maximumAge;
return this;
}
public HadoopFileStatusWriter.Builder pathFilter(final PathFilter pathFilter) {
this.pathFilter = pathFilter;
return this;
}
public HadoopFileStatusWriter.Builder fileStatusManager(final FileStatusManager fileStatusManager) {
this.fileStatusManager = fileStatusManager;
return this;
}
public HadoopFileStatusWriter.Builder previousLatestTimestamp(final long previousLatestTimestamp) {
this.previousLatestTimestamp = previousLatestTimestamp;
return this;
}
public HadoopFileStatusWriter.Builder previousLatestFiles(final List<String> previousLatestFiles) {
this.previousLatestFiles = previousLatestFiles;
return this;
}
public HadoopFileStatusWriter.Builder successRelationship(final Relationship successRelationship) {
this.successRelationship = successRelationship;
return this;
}
public HadoopFileStatusWriter.Builder writerFactory(final RecordSetWriterFactory writerFactory) {
this.writerFactory = writerFactory;
return this;
}
public HadoopFileStatusWriter.Builder logger(final ComponentLog logger) {
this.logger = logger;
return this;
}
public HadoopFileStatusWriter.Builder hdfsPrefix(final String hdfsPrefix) {
this.hdfsPrefix = hdfsPrefix;
return this;
}
public HadoopFileStatusWriter build() {
validateMandatoryField("session", session);
validateMandatoryField("successRelationship", successRelationship);
validateMandatoryField("fileStatusIterable", fileStatusIterable);
validateMandatoryField("fileStatusManager", fileStatusManager);
if (writerFactory == null) {
return new FlowFileHadoopFileStatusWriter(session, successRelationship, fileStatusIterable, fileStatusManager, pathFilter, minimumAge, maximumAge,
previousLatestTimestamp, previousLatestFiles, writerFactory, hdfsPrefix, logger);
} else {
return new RecordHadoopFileStatusWriter(session, successRelationship, fileStatusIterable, fileStatusManager, pathFilter, minimumAge, maximumAge,
previousLatestTimestamp, previousLatestFiles, writerFactory, hdfsPrefix, logger);
}
}
private void validateMandatoryField(String variableName, Object variable) {
if (variable == null) {
throw new IllegalArgumentException(variableName + " is null but must be set");
}
}
}
public ProcessSession getSession() {
return session;
}
public Relationship getSuccessRelationship() {
return successRelationship;
}
public FileStatusIterable getFileStatusIterable() {
return fileStatusIterable;
}
public FileStatusManager getFileStatusManager() {
return fileStatusManager;
}
public PathFilter getPathFilter() {
return pathFilter;
}
public long getMinimumAge() {
return minimumAge;
}
public long getMaximumAge() {
return maximumAge;
}
public long getPreviousLatestTimestamp() {
return previousLatestTimestamp;
}
public List<String> getPreviousLatestFiles() {
return previousLatestFiles;
}
public RecordSetWriterFactory getWriterFactory() {
return writerFactory;
}
public String getHdfsPrefix() {
return hdfsPrefix;
}
public ComponentLog getLogger() {
return logger;
}
}

View File

@ -20,8 +20,10 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
import org.apache.nifi.processors.hadoop.util.FileStatusManager;
@ -42,9 +44,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.hadoop.ListHDFS.REL_SUCCESS;
public class RecordObjectWriter extends HadoopFileStatusWriter {
public class RecordHadoopFileStatusWriter extends HadoopFileStatusWriter {
private static final RecordSchema RECORD_SCHEMA;
@ -78,23 +78,19 @@ public class RecordObjectWriter extends HadoopFileStatusWriter {
RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
}
private final RecordSetWriterFactory writerFactory;
private final ComponentLog logger;
public RecordObjectWriter(final ProcessSession session,
final FileStatusIterable fileStatuses,
public RecordHadoopFileStatusWriter(final ProcessSession session,
final Relationship successRelationship,
final FileStatusIterable fileStatusIterable,
final FileStatusManager fileStatusManager,
final PathFilter pathFilter,
final long minimumAge,
final long maximumAge,
final PathFilter pathFilter,
final FileStatusManager fileStatusManager,
final long previousLatestModificationTime,
final long previousLatestTimestamp,
final List<String> previousLatestFiles,
final RecordSetWriterFactory writerFactory,
final String hdfsPrefix,
final ComponentLog logger) {
super(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, previousLatestModificationTime, previousLatestFiles);
this.writerFactory = writerFactory;
this.logger = logger;
super(session, successRelationship, fileStatusIterable, fileStatusManager, pathFilter, minimumAge, maximumAge, previousLatestTimestamp, previousLatestFiles, writerFactory, hdfsPrefix, logger);
}
@Override
@ -102,10 +98,12 @@ public class RecordObjectWriter extends HadoopFileStatusWriter {
FlowFile flowFile = session.create();
final WriteResult writeResult;
final String mimeType;
try (
final OutputStream out = session.write(flowFile);
final RecordSetWriter recordWriter = writerFactory.createWriter(logger, RECORD_SCHEMA, out, flowFile)
) {
mimeType = recordWriter.getMimeType();
recordWriter.beginRecordSet();
for (FileStatus status : fileStatusIterable) {
if (determineListable(status)) {
@ -124,8 +122,9 @@ public class RecordObjectWriter extends HadoopFileStatusWriter {
} else {
final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), mimeType);
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
session.transfer(flowFile, successRelationship);
}
}