diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 330506d14d..e706ab65f0 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -46,9 +46,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.hadoop.util.FileStatusIterable; import org.apache.nifi.processors.hadoop.util.FileStatusManager; import org.apache.nifi.processors.hadoop.util.FilterMode; -import org.apache.nifi.processors.hadoop.util.writer.FlowFileObjectWriter; import org.apache.nifi.processors.hadoop.util.writer.HadoopFileStatusWriter; -import org.apache.nifi.processors.hadoop.util.writer.RecordObjectWriter; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.serialization.RecordSetWriterFactory; @@ -100,6 +98,7 @@ import static org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_DIRECTORI public class ListHDFS extends AbstractHadoopProcessor { private static final String NON_HIDDEN_FILES_REGEX = "[^\\.].*"; + private static final String HDFS_ATTRIBUTE_PREFIX = "hdfs"; public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() .name("Recurse Subdirectories") @@ -163,6 +162,8 @@ public class ListHDFS extends AbstractHadoopProcessor { public static final String LATEST_TIMESTAMP_KEY = "latest.timestamp"; public static final String LATEST_FILES_KEY = "latest.file.%d"; + private static final List LIST_HDFS_PROPERTIES = Arrays.asList( + DIRECTORY, RECURSE_SUBDIRS, RECORD_WRITER, FILE_FILTER, FILE_FILTER_MODE, MINIMUM_FILE_AGE, MAXIMUM_FILE_AGE); private static final Set RELATIONSHIPS = Collections.singleton(REL_SUCCESS); private Pattern fileFilterRegexPattern; private volatile boolean resetState = false; @@ -177,7 +178,7 @@ public class ListHDFS extends AbstractHadoopProcessor { @Override protected List getSupportedPropertyDescriptors() { final List props = new ArrayList<>(properties); - props.addAll(Arrays.asList(DIRECTORY, RECURSE_SUBDIRS, RECORD_WRITER, FILE_FILTER, FILE_FILTER_MODE, MINIMUM_FILE_AGE, MAXIMUM_FILE_AGE)); + props.addAll(LIST_HDFS_PROPERTIES); return props; } @@ -263,24 +264,31 @@ public class ListHDFS extends AbstractHadoopProcessor { final FileStatusManager fileStatusManager = new FileStatusManager(latestTimestamp, latestFiles); final Path rootPath = getNormalizedPath(context, DIRECTORY); - final FileStatusIterable fileStatuses = new FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation()); + final FileStatusIterable fileStatusIterable = new FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation()); final Long minAgeProp = context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS); final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp; final Long maxAgeProp = context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS); final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp; - final HadoopFileStatusWriter writer; - if (writerFactory == null) { - writer = new FlowFileObjectWriter(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp, latestFiles); - } else { - writer = new RecordObjectWriter(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp, - latestFiles, writerFactory, getLogger()); - } + final HadoopFileStatusWriter writer = HadoopFileStatusWriter.builder() + .session(session) + .successRelationship(getSuccessRelationship()) + .fileStatusIterable(fileStatusIterable) + .fileStatusManager(fileStatusManager) + .pathFilter(pathFilter) + .minimumAge(minimumAge) + .maximumAge(maximumAge) + .previousLatestTimestamp(latestTimestamp) + .previousLatestFiles(latestFiles) + .writerFactory(writerFactory) + .hdfsPrefix(getAttributePrefix()) + .logger(getLogger()) + .build(); writer.write(); - getLogger().debug("Found a total of {} files in HDFS, {} are listed", fileStatuses.getTotalFileCount(), writer.getListedFileCount()); + getLogger().debug("Found a total of {} files in HDFS, {} are listed", fileStatusIterable.getTotalFileCount(), writer.getListedFileCount()); if (writer.getListedFileCount() > 0) { final Map updatedState = new HashMap<>(); @@ -335,4 +343,12 @@ public class ListHDFS extends AbstractHadoopProcessor { getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", e); } } + + protected Relationship getSuccessRelationship() { + return REL_SUCCESS; + } + + protected String getAttributePrefix() { + return HDFS_ATTRIBUTE_PREFIX; + } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/FlowFileObjectWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/FlowFileHadoopFileStatusWriter.java similarity index 51% rename from nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/FlowFileObjectWriter.java rename to nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/FlowFileHadoopFileStatusWriter.java index 4b9b5608bf..6f154be7af 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/FlowFileObjectWriter.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/FlowFileHadoopFileStatusWriter.java @@ -20,28 +20,31 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.PathFilter; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processors.hadoop.ListHDFS; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processors.hadoop.util.FileStatusIterable; import org.apache.nifi.processors.hadoop.util.FileStatusManager; +import org.apache.nifi.serialization.RecordSetWriterFactory; import java.util.HashMap; import java.util.List; import java.util.Map; -public class FlowFileObjectWriter extends HadoopFileStatusWriter { - - private static final String HDFS_ATTRIBUTE_PREFIX = "hdfs"; - - public FlowFileObjectWriter(final ProcessSession session, - final FileStatusIterable fileStatuses, - final long minimumAge, - final long maximumAge, - final PathFilter pathFilter, - final FileStatusManager fileStatusManager, - final long previousLatestModificationTime, - final List previousLatestFiles) { - super(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, previousLatestModificationTime, previousLatestFiles); +public class FlowFileHadoopFileStatusWriter extends HadoopFileStatusWriter { + public FlowFileHadoopFileStatusWriter(final ProcessSession session, + final Relationship successRelationship, + final FileStatusIterable fileStatusIterable, + final FileStatusManager fileStatusManager, + final PathFilter pathFilter, + final long minimumAge, + final long maximumAge, + final long previousLatestTimestamp, + final List previousLatestFiles, + final RecordSetWriterFactory writerFactory, + final String hdfsPrefix, + final ComponentLog logger) { + super(session, successRelationship, fileStatusIterable, fileStatusManager, pathFilter, minimumAge, maximumAge, previousLatestTimestamp, previousLatestFiles, writerFactory, hdfsPrefix, logger); } @Override @@ -52,7 +55,7 @@ public class FlowFileObjectWriter extends HadoopFileStatusWriter { final Map attributes = createAttributes(status); FlowFile flowFile = session.create(); flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, ListHDFS.REL_SUCCESS); + session.transfer(flowFile, successRelationship); fileStatusManager.update(status); fileCount++; @@ -64,12 +67,12 @@ public class FlowFileObjectWriter extends HadoopFileStatusWriter { final Map attributes = new HashMap<>(); attributes.put(CoreAttributes.FILENAME.key(), status.getPath().getName()); attributes.put(CoreAttributes.PATH.key(), getAbsolutePath(status.getPath().getParent())); - attributes.put(HDFS_ATTRIBUTE_PREFIX + ".owner", status.getOwner()); - attributes.put(HDFS_ATTRIBUTE_PREFIX + ".group", status.getGroup()); - attributes.put(HDFS_ATTRIBUTE_PREFIX + ".lastModified", String.valueOf(status.getModificationTime())); - attributes.put(HDFS_ATTRIBUTE_PREFIX + ".length", String.valueOf(status.getLen())); - attributes.put(HDFS_ATTRIBUTE_PREFIX + ".replication", String.valueOf(status.getReplication())); - attributes.put(HDFS_ATTRIBUTE_PREFIX + ".permissions", getPermissionsString(status.getPermission())); + attributes.put(hdfsPrefix + ".owner", status.getOwner()); + attributes.put(hdfsPrefix + ".group", status.getGroup()); + attributes.put(hdfsPrefix + ".lastModified", String.valueOf(status.getModificationTime())); + attributes.put(hdfsPrefix + ".length", String.valueOf(status.getLen())); + attributes.put(hdfsPrefix + ".replication", String.valueOf(status.getReplication())); + attributes.put(hdfsPrefix + ".permissions", getPermissionsString(status.getPermission())); return attributes; } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HadoopFileStatusWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HadoopFileStatusWriter.java index 9adf0d3652..ba0d42e784 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HadoopFileStatusWriter.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HadoopFileStatusWriter.java @@ -21,9 +21,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processors.hadoop.util.FileStatusIterable; import org.apache.nifi.processors.hadoop.util.FileStatusManager; +import org.apache.nifi.serialization.RecordSetWriterFactory; import java.util.List; @@ -33,33 +36,45 @@ import java.util.List; public abstract class HadoopFileStatusWriter { protected final ProcessSession session; + protected final Relationship successRelationship; protected final FileStatusIterable fileStatusIterable; + protected final FileStatusManager fileStatusManager; + protected final PathFilter pathFilter; protected final long minimumAge; protected final long maximumAge; - protected final PathFilter pathFilter; - protected final FileStatusManager fileStatusManager; protected final long previousLatestTimestamp; protected final List previousLatestFiles; + + protected final RecordSetWriterFactory writerFactory; + protected final String hdfsPrefix; + protected final ComponentLog logger; + protected final long currentTimeMillis; protected long fileCount; - private final long currentTimeMillis; - - HadoopFileStatusWriter(final ProcessSession session, - final FileStatusIterable fileStatusIterable, - final long minimumAge, - final long maximumAge, - final PathFilter pathFilter, - final FileStatusManager fileStatusManager, - final long previousLatestTimestamp, - final List previousLatestFiles) { + public HadoopFileStatusWriter(final ProcessSession session, + final Relationship successRelationship, + final FileStatusIterable fileStatusIterable, + final FileStatusManager fileStatusManager, + final PathFilter pathFilter, + final long minimumAge, + final long maximumAge, + final long previousLatestTimestamp, + final List previousLatestFiles, + final RecordSetWriterFactory writerFactory, + final String hdfsPrefix, + final ComponentLog logger) { this.session = session; + this.successRelationship = successRelationship; this.fileStatusIterable = fileStatusIterable; + this.fileStatusManager = fileStatusManager; + this.pathFilter = pathFilter; this.minimumAge = minimumAge; this.maximumAge = maximumAge; - this.pathFilter = pathFilter; - this.fileStatusManager = fileStatusManager; this.previousLatestTimestamp = previousLatestTimestamp; this.previousLatestFiles = previousLatestFiles; + this.writerFactory = writerFactory; + this.hdfsPrefix = hdfsPrefix; + this.logger = logger; currentTimeMillis = System.currentTimeMillis(); fileCount = 0L; } @@ -110,4 +125,152 @@ public abstract class HadoopFileStatusWriter { return String.format("%s%s%s", getPerms(permission.getUserAction()), getPerms(permission.getGroupAction()), getPerms(permission.getOtherAction())); } + + public static HadoopFileStatusWriter.Builder builder() { + return new HadoopFileStatusWriter.Builder(); + } + + public static class Builder { + private ProcessSession session; + private FileStatusIterable fileStatusIterable; + private long minimumAge; + private long maximumAge; + private PathFilter pathFilter; + private FileStatusManager fileStatusManager; + private long previousLatestTimestamp; + private List previousLatestFiles; + private Relationship successRelationship; + private RecordSetWriterFactory writerFactory; + private ComponentLog logger; + private String hdfsPrefix; + + public HadoopFileStatusWriter.Builder session(final ProcessSession session) { + this.session = session; + return this; + } + + public HadoopFileStatusWriter.Builder fileStatusIterable(final FileStatusIterable fileStatusIterable) { + this.fileStatusIterable = fileStatusIterable; + return this; + } + + public HadoopFileStatusWriter.Builder minimumAge(final long minimumAge) { + this.minimumAge = minimumAge; + return this; + } + + public HadoopFileStatusWriter.Builder maximumAge(final long maximumAge) { + this.maximumAge = maximumAge; + return this; + } + + public HadoopFileStatusWriter.Builder pathFilter(final PathFilter pathFilter) { + this.pathFilter = pathFilter; + return this; + } + + public HadoopFileStatusWriter.Builder fileStatusManager(final FileStatusManager fileStatusManager) { + this.fileStatusManager = fileStatusManager; + return this; + } + + public HadoopFileStatusWriter.Builder previousLatestTimestamp(final long previousLatestTimestamp) { + this.previousLatestTimestamp = previousLatestTimestamp; + return this; + } + + public HadoopFileStatusWriter.Builder previousLatestFiles(final List previousLatestFiles) { + this.previousLatestFiles = previousLatestFiles; + return this; + } + + public HadoopFileStatusWriter.Builder successRelationship(final Relationship successRelationship) { + this.successRelationship = successRelationship; + return this; + } + + public HadoopFileStatusWriter.Builder writerFactory(final RecordSetWriterFactory writerFactory) { + this.writerFactory = writerFactory; + return this; + } + + public HadoopFileStatusWriter.Builder logger(final ComponentLog logger) { + this.logger = logger; + return this; + } + + public HadoopFileStatusWriter.Builder hdfsPrefix(final String hdfsPrefix) { + this.hdfsPrefix = hdfsPrefix; + return this; + } + + public HadoopFileStatusWriter build() { + validateMandatoryField("session", session); + validateMandatoryField("successRelationship", successRelationship); + validateMandatoryField("fileStatusIterable", fileStatusIterable); + validateMandatoryField("fileStatusManager", fileStatusManager); + + if (writerFactory == null) { + return new FlowFileHadoopFileStatusWriter(session, successRelationship, fileStatusIterable, fileStatusManager, pathFilter, minimumAge, maximumAge, + previousLatestTimestamp, previousLatestFiles, writerFactory, hdfsPrefix, logger); + } else { + return new RecordHadoopFileStatusWriter(session, successRelationship, fileStatusIterable, fileStatusManager, pathFilter, minimumAge, maximumAge, + previousLatestTimestamp, previousLatestFiles, writerFactory, hdfsPrefix, logger); + } + } + + private void validateMandatoryField(String variableName, Object variable) { + if (variable == null) { + throw new IllegalArgumentException(variableName + " is null but must be set"); + } + } + } + + public ProcessSession getSession() { + return session; + } + + public Relationship getSuccessRelationship() { + return successRelationship; + } + + public FileStatusIterable getFileStatusIterable() { + return fileStatusIterable; + } + + public FileStatusManager getFileStatusManager() { + return fileStatusManager; + } + + public PathFilter getPathFilter() { + return pathFilter; + } + + public long getMinimumAge() { + return minimumAge; + } + + public long getMaximumAge() { + return maximumAge; + } + + public long getPreviousLatestTimestamp() { + return previousLatestTimestamp; + } + + public List getPreviousLatestFiles() { + return previousLatestFiles; + } + + public RecordSetWriterFactory getWriterFactory() { + return writerFactory; + } + + public String getHdfsPrefix() { + return hdfsPrefix; + } + + public ComponentLog getLogger() { + return logger; + } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/RecordObjectWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/RecordHadoopFileStatusWriter.java similarity index 79% rename from nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/RecordObjectWriter.java rename to nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/RecordHadoopFileStatusWriter.java index 066460a022..77f0c39eb4 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/RecordObjectWriter.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/RecordHadoopFileStatusWriter.java @@ -20,8 +20,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.hadoop.util.FileStatusIterable; import org.apache.nifi.processors.hadoop.util.FileStatusManager; @@ -42,9 +44,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.nifi.processors.hadoop.ListHDFS.REL_SUCCESS; - -public class RecordObjectWriter extends HadoopFileStatusWriter { +public class RecordHadoopFileStatusWriter extends HadoopFileStatusWriter { private static final RecordSchema RECORD_SCHEMA; @@ -78,23 +78,19 @@ public class RecordObjectWriter extends HadoopFileStatusWriter { RECORD_SCHEMA = new SimpleRecordSchema(recordFields); } - - private final RecordSetWriterFactory writerFactory; - private final ComponentLog logger; - - public RecordObjectWriter(final ProcessSession session, - final FileStatusIterable fileStatuses, - final long minimumAge, - final long maximumAge, - final PathFilter pathFilter, - final FileStatusManager fileStatusManager, - final long previousLatestModificationTime, - final List previousLatestFiles, - final RecordSetWriterFactory writerFactory, - final ComponentLog logger) { - super(session, fileStatuses, minimumAge, maximumAge, pathFilter, fileStatusManager, previousLatestModificationTime, previousLatestFiles); - this.writerFactory = writerFactory; - this.logger = logger; + public RecordHadoopFileStatusWriter(final ProcessSession session, + final Relationship successRelationship, + final FileStatusIterable fileStatusIterable, + final FileStatusManager fileStatusManager, + final PathFilter pathFilter, + final long minimumAge, + final long maximumAge, + final long previousLatestTimestamp, + final List previousLatestFiles, + final RecordSetWriterFactory writerFactory, + final String hdfsPrefix, + final ComponentLog logger) { + super(session, successRelationship, fileStatusIterable, fileStatusManager, pathFilter, minimumAge, maximumAge, previousLatestTimestamp, previousLatestFiles, writerFactory, hdfsPrefix, logger); } @Override @@ -102,10 +98,12 @@ public class RecordObjectWriter extends HadoopFileStatusWriter { FlowFile flowFile = session.create(); final WriteResult writeResult; + final String mimeType; try ( final OutputStream out = session.write(flowFile); final RecordSetWriter recordWriter = writerFactory.createWriter(logger, RECORD_SCHEMA, out, flowFile) ) { + mimeType = recordWriter.getMimeType(); recordWriter.beginRecordSet(); for (FileStatus status : fileStatusIterable) { if (determineListable(status)) { @@ -124,8 +122,9 @@ public class RecordObjectWriter extends HadoopFileStatusWriter { } else { final Map attributes = new HashMap<>(writeResult.getAttributes()); attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), mimeType); flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, REL_SUCCESS); + session.transfer(flowFile, successRelationship); } }