From 568959baf50626cb8954fab19901590024aba755 Mon Sep 17 00:00:00 2001 From: Ed Date: Tue, 17 Apr 2018 10:26:50 -0400 Subject: [PATCH] NIFI-4906 Add GetHDFSFileInfo - Updated doc for write attributes - Removing redundant files - Updates based on review Changes: - changed validators for regex params (3) - removed TriggerWhenEmpty and TriggerSerially. - removed unused LISTING_LAG_NANOS - removed unimplemented customValidate - removed unsused error handling for building/updating request params - added MIME_TYPE for content - suppressed output to original relationship when scheduled - added full path to original/new FF when not found - added warning messages for huge attribute values. This closes #2639. Signed-off-by: Bryan Bende --- .../nifi-hdfs-processors/pom.xml | 7 + .../processors/hadoop/GetHDFSFileInfo.java | 797 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../hadoop/TestGetHDFSFileInfo.java | 724 ++++++++++++++++ .../testRecursiveGroupAllToAttributes.json | 1 + ...estRecursiveGroupDirToAttributes-dir1.json | 1 + ...estRecursiveGroupDirToAttributes-dir2.json | 1 + ...stRecursiveGroupDirToAttributes-mydir.json | 1 + ...tRecursiveGroupDirToAttributes-regDir.json | 1 + ...RecursiveGroupDirToAttributes-regDir2.json | 1 + ...estRunWithPermissionsExceptionContent.json | 1 + 11 files changed, 1536 insertions(+) create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupAllToAttributes.json create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-dir1.json create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-dir2.json create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-mydir.json create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-regDir.json create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-regDir2.json create mode 100644 nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRunWithPermissionsExceptionContent.json diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml index dd5255754c..291f56bad9 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml @@ -95,6 +95,13 @@ src/test/resources/testdata/.dotfile + src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-mydir.json + src/test/resources/TestGetHDFSFileInfo/testRunWithPermissionsExceptionContent.json + src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-regDir.json + src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-dir1.json + src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-dir2.json + src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupAllToAttributes.json + src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-regDir2.json diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java new file mode 100644 index 0000000000..cda53f016c --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFSFileInfo.java @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Groupping; + +@InputRequirement(Requirement.INPUT_ALLOWED) +@Tags({"hadoop", "HDFS", "get", "list", "ingest", "source", "filesystem"}) +@CapabilityDescription("Retrieves a listing of files and directories from HDFS. " + + "This processor creates a FlowFile(s) that represents the HDFS file/dir with relevant information. " + + "Main purpose of this processor to provide functionality similar to HDFS Client, i.e. count, du, ls, test, etc. " + + "Unlike ListHDFS, this processor is stateless, supports incoming connections and provides information on a dir level. " + ) +@WritesAttributes({ + @WritesAttribute(attribute="hdfs.objectName", description="The name of the file/dir found on HDFS."), + @WritesAttribute(attribute="hdfs.path", description="The path is set to the absolute path of the object's parent directory on HDFS. " + + "For example, if an object is a directory 'foo', under directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path' will be '/bar'"), + @WritesAttribute(attribute="hdfs.type", description="The type of an object. Possible values: directory, file, link"), + @WritesAttribute(attribute="hdfs.owner", description="The user that owns the object in HDFS"), + @WritesAttribute(attribute="hdfs.group", description="The group that owns the object in HDFS"), + @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the object in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"), + @WritesAttribute(attribute="hdfs.length", description="" + + "In case of files: The number of bytes in the file in HDFS. " + + "In case of dirs: Retuns storage space consumed by directory. " + + ""), + @WritesAttribute(attribute="hdfs.count.files", description="In case of type='directory' will represent total count of files under this dir. " + + "Won't be populated to other types of HDFS objects. "), + @WritesAttribute(attribute="hdfs.count.dirs", description="In case of type='directory' will represent total count of directories under this dir (including itself). " + + "Won't be populated to other types of HDFS objects. "), + @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for the file"), + @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the object in HDFS. This is formatted as 3 characters for the owner, " + + "3 for the group, and 3 for other users. For example rw-rw-r--"), + @WritesAttribute(attribute="hdfs.status", description="The status contains comma separated list of file/dir paths, which couldn't be listed/accessed. " + + "Status won't be set if no errors occured."), + @WritesAttribute(attribute="hdfs.full.tree", description="When destination is 'attribute', will be populated with full tree of HDFS directory in JSON format." + + "WARNING: In case when scan finds thousands or millions of objects, having huge values in attribute could impact flow file repo and GC/heap usage. " + + "Use content destination for such cases") +}) +@SeeAlso({ListHDFS.class, GetHDFS.class, FetchHDFS.class, PutHDFS.class}) +public class GetHDFSFileInfo extends AbstractHadoopProcessor { + public static final String APPLICATION_JSON = "application/json"; + public static final PropertyDescriptor FULL_PATH = new PropertyDescriptor.Builder() + .displayName("Full path") + .name("gethdfsfileinfo-full-path") + .description("A directory to start listing from, or a file's full path.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue("") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() + .displayName("Recurse Subdirectories") + .name("gethdfsfileinfo-recurse-subdirs") + .description("Indicates whether to list files from subdirectories of the HDFS directory") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final PropertyDescriptor DIR_FILTER = new PropertyDescriptor.Builder() + .displayName("Directory Filter") + .name("gethdfsfileinfo-dir-filter") + .description("Regex. Only directories whose names match the given regular expression will be picked up. If not provided, any filter would be apply (performance considerations).") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true)) + .build(); + + public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder() + .displayName("File Filter") + .name("gethdfsfileinfo-file-filter") + .description("Regex. Only files whose names match the given regular expression will be picked up. If not provided, any filter would be apply (performance considerations).") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true)) + .build(); + + public static final PropertyDescriptor FILE_EXCLUDE_FILTER = new PropertyDescriptor.Builder() + .displayName("Exclude Files") + .name("gethdfsfileinfo-file-exclude-filter") + .description("Regex. Files whose names match the given regular expression will not be picked up. If not provided, any filter won't be apply (performance considerations).") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true)) + .build(); + + public static final PropertyDescriptor IGNORE_DOTTED_DIRS = new PropertyDescriptor.Builder() + .displayName("Ignore Dotted Directories") + .name("gethdfsfileinfo-ignore-dotted-dirs") + .description("If true, directories whose names begin with a dot (\".\") will be ignored") + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder() + .displayName("Ignore Dotted Files") + .name("gethdfsfileinfo-ignore-dotted-files") + .description("If true, files whose names begin with a dot (\".\") will be ignored") + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + static final AllowableValue GROUP_ALL = new AllowableValue("gethdfsfileinfo-group-all", "All", + "Group all results into a single flowfile."); + + static final AllowableValue GROUP_PARENT_DIR = new AllowableValue("gethdfsfileinfo-group-parent-dir", "Parent Directory", + "Group HDFS objects by their parent directories only. Processor will generate flowfile for each directory (if recursive). " + + "If 'Recurse Subdirectories' property set to 'false', then will have the same effect as 'All'"); + + static final AllowableValue GROUP_NONE = new AllowableValue("gethdfsfileinfo-group-none", "None", + "Don't group results. Generate flowfile per each HDFS object."); + + public static final PropertyDescriptor GROUPING = new PropertyDescriptor.Builder() + .displayName("Group Results") + .name("gethdfsfileinfo-group") + .description("Groups HDFS objects") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(GROUP_ALL, GROUP_PARENT_DIR, GROUP_NONE) + .defaultValue(GROUP_ALL.getValue()) + .build(); + + static final AllowableValue DESTINATION_ATTRIBUTES = new AllowableValue("gethdfsfileinfo-dest-attr", "Attributes", + "Details of given HDFS object will be stored in attributes of flowfile. " + + "WARNING: In case when scan finds thousands or millions of objects, having huge values in attribute could impact flow file repo and GC/heap usage. " + + "Use content destination for such cases."); + + static final AllowableValue DESTINATION_CONTENT = new AllowableValue("gethdfsfileinfo-dest-content", "Content", + "Details of given HDFS object will be stored in a content in JSON format"); + + public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() + .displayName("Destination") + .name("gethdfsfileinfo-destination") + .description("Sets the destination for the resutls. When set to 'Content', attributes of flowfile won't be used for storing results. ") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(DESTINATION_ATTRIBUTES, DESTINATION_CONTENT) + .defaultValue(DESTINATION_CONTENT.getValue()) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All successfully generated FlowFiles are transferred to this relationship") + .build(); + + public static final Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("If no objects are found, original FlowFile are transferred to this relationship") + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Original FlowFiles are transferred to this relationship") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("All failed attempts to access HDFS will be routed to this relationship") + .build(); + + private HDFSFileInfoRequest req; + + @Override + protected void init(final ProcessorInitializationContext context) { + super.init(context); + } + + @Override + protected List getSupportedPropertyDescriptors() { + final List props = new ArrayList<>(properties); + props.add(FULL_PATH); + props.add(RECURSE_SUBDIRS); + props.add(DIR_FILTER); + props.add(FILE_FILTER); + props.add(FILE_EXCLUDE_FILTER); + props.add(IGNORE_DOTTED_DIRS); + props.add(IGNORE_DOTTED_FILES); + props.add(GROUPING); + props.add(DESTINATION); + return props; + } + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_NOT_FOUND); + relationships.add(REL_ORIGINAL); + relationships.add(REL_FAILURE); + return relationships; + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + super.onPropertyModified(descriptor, oldValue, newValue); + // drop request details to rebuild it + req = null; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile ff = null; + if (context.hasIncomingConnection()) { + ff = session.get(); + + // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. + // However, if we have no FlowFile and we have connections coming from other Processors, then + // we know that we should run only if we have a FlowFile. + if (ff == null && context.hasNonLoopConnection()) { + context.yield(); + return; + } + } + boolean scheduledFF = false; + if (ff == null) { + ff = session.create(); + scheduledFF = true; + } + + if (req == null) { + //rebuild the request details based on + req = buildRequestDetails(context, session, ff); + }else { + //avoid rebuilding req object's patterns in order to have better performance + req = updateRequestDetails(context, session, ff); + } + + try { + final FileSystem hdfs = getFileSystem(); + UserGroupInformation ugi = getUserGroupInformation(); + HDFSObjectInfoDetails res = walkHDFSTree(context, session, ff, hdfs, ugi, req, null, false); + if (res == null) { + ff = session.putAttribute(ff, "hdfs.status", "Path not found: " + req.fullPath); + session.transfer(ff, REL_NOT_FOUND); + return; + } + if (!scheduledFF) { + session.transfer(ff, REL_ORIGINAL); + }else { + session.remove(ff); + } + } catch (final IOException | IllegalArgumentException e) { + getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e}); + ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e); + session.transfer(ff, REL_FAILURE); + return; + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + getLogger().error("Interrupted while performing listing of HDFS", e); + ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e); + session.transfer(ff, REL_FAILURE); + return; + } catch (final Exception e) { + getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e}); + ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e); + session.transfer(ff, REL_FAILURE); + return; + } + } + + + /* + * Walks thru HDFS tree. This method will return null to the main if there is no provided path existing. + */ + protected HDFSObjectInfoDetails walkHDFSTree(final ProcessContext context, final ProcessSession session, FlowFile origFF, final FileSystem hdfs, + final UserGroupInformation ugi, final HDFSFileInfoRequest req, HDFSObjectInfoDetails parent, final boolean statsOnly) throws Exception{ + + final HDFSObjectInfoDetails p = parent; + + if (!ugi.doAs((PrivilegedExceptionAction) () -> hdfs.exists(p != null ? p.getPath() : new Path(req.fullPath)))) { + return null; + } + + if (parent == null) { + parent = new HDFSObjectInfoDetails(ugi.doAs((PrivilegedExceptionAction) () -> hdfs.getFileStatus(new Path(req.fullPath)))); + } + if (parent.isFile() && p == null) { + //single file path requested and found, lets send to output: + processHDFSObject(context, session, origFF, req, parent, true); + return parent; + } + + final Path path = parent.getPath(); + + FileStatus[] listFSt = null; + try { + listFSt = ugi.doAs((PrivilegedExceptionAction) () -> hdfs.listStatus(path)); + }catch (IOException e) { + parent.error = "Couldn't list directory: " + e; + processHDFSObject(context, session, origFF, req, parent, p == null); + return parent; //File not found exception, or access denied - don't interrupt, just don't list + } + if (listFSt != null) { + for (FileStatus f : listFSt) { + HDFSObjectInfoDetails o = new HDFSObjectInfoDetails(f); + HDFSObjectInfoDetails vo = validateMatchingPatterns(o, req); + if (o.isDirectory() && !o.isSymlink() && req.isRecursive) { + o = walkHDFSTree(context, session, origFF, hdfs, ugi, req, o, vo == null || statsOnly); + parent.countDirs += o.countDirs; + parent.totalLen += o.totalLen; + parent.countFiles += o.countFiles; + }else if (o.isDirectory() && o.isSymlink()) { + parent.countDirs += 1; + }else if (o.isFile() && !o.isSymlink()) { + parent.countFiles += 1; + parent.totalLen += o.getLen(); + }else if (o.isFile() && o.isSymlink()) { + parent.countFiles += 1; // do not add length of the symlink, as it doesn't consume space under THIS directory, but count files, as it is still an object. + } + + // Decide what to do with child: if requested FF per object or per dir - just emit new FF with info in 'o' object + if (vo != null && !statsOnly) { + parent.addChild(vo); + if (p != null && req.isRecursive + && vo.isFile() && !vo.isSymlink()) { + processHDFSObject(context, session, origFF, req, vo, false); + } + } + } + if (!statsOnly) { + processHDFSObject(context, session, origFF, req, parent, p==null); + } + if (req.groupping != Groupping.ALL) { + parent.setChildren(null); //we need children in full tree only when single output requested. + } + } + + return parent; + } + + protected HDFSObjectInfoDetails validateMatchingPatterns(final HDFSObjectInfoDetails o, HDFSFileInfoRequest req) { + if (o == null || o.getPath() == null) { + return null; + } + + if (o.isFile()) { + if (req.isIgnoreDotFiles && o.getPath().getName().startsWith(".")) { + return null; + }else if (req.fileExcludeFilter != null && req.fileExcludeFilter.matcher(o.getPath().getName()).matches()) { + return null; + }else if (req.fileFilter == null) { + return o; + }else if (req.fileFilter != null && req.fileFilter.matcher(o.getPath().getName()).matches()) { + return o; + }else { + return null; + } + } + if (o.isDirectory()) { + if (req.isIgnoreDotDirs && o.getPath().getName().startsWith(".")) { + return null; + }else if (req.dirFilter == null) { + return o; + }else if (req.dirFilter != null && req.dirFilter.matcher(o.getPath().getName()).matches()) { + return o; + }else { + return null; + } + } + return null; + } + + /* + * Checks whether HDFS object should be sent to output. + * If it should be sent, new flowfile will be created, its content and attributes will be populated according to other request params. + */ + protected HDFSObjectInfoDetails processHDFSObject(final ProcessContext context, final ProcessSession session, + FlowFile origFF, final HDFSFileInfoRequest req, final HDFSObjectInfoDetails o, final boolean isRoot) { + if (o.isFile() && req.groupping != Groupping.NONE) { + return null; //there is grouping by either root directory or every directory, no need to print separate files. + } + if (o.isDirectory() && o.isSymlink() && req.groupping != Groupping.NONE) { + return null; //ignore symlink dirs an + } + if (o.isDirectory() && req.groupping == Groupping.ALL && !isRoot) { + return null; + } + FlowFile ff = session.create(origFF); + + //if destination type is content - always add mime type + if (req.isDestContent) { + ff = session.putAttribute(ff, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); + } + + //won't combine conditions for similar actions for better readability and maintenance. + if (o.isFile() && isRoot && req.isDestContent) { + ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes())); + // ------------------------------ + }else if (o.isFile() && isRoot && !req.isDestContent) { + ff = session.putAllAttributes(ff, o.toAttributesMap()); + // ------------------------------ + }else if (o.isFile() && req.isDestContent) { + ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes())); + // ------------------------------ + }else if (o.isFile() && !req.isDestContent) { + ff = session.putAllAttributes(ff, o.toAttributesMap()); + // ------------------------------ + }else if (o.isDirectory() && o.isSymlink() && req.isDestContent) { + ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes())); + // ------------------------------ + }else if (o.isDirectory() && o.isSymlink() && !req.isDestContent) { + ff = session.putAllAttributes(ff, o.toAttributesMap()); + // ------------------------------ + }else if (o.isDirectory() && req.groupping == Groupping.NONE && req.isDestContent) { + o.setChildren(null); + ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes())); + // ------------------------------ + }else if (o.isDirectory() && req.groupping == Groupping.NONE && !req.isDestContent) { + ff = session.putAllAttributes(ff, o.toAttributesMap()); + // ------------------------------ + }else if (o.isDirectory() && req.groupping == Groupping.DIR && req.isDestContent) { + ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes())); + // ------------------------------ + }else if (o.isDirectory() && req.groupping == Groupping.DIR && !req.isDestContent) { + ff = session.putAllAttributes(ff, o.toAttributesMap()); + ff = session.putAttribute(ff, "hdfs.full.tree", o.toJsonString()); + // ------------------------------ + }else if (o.isDirectory() && req.groupping == Groupping.ALL && req.isDestContent) { + ff = session.write(ff, (out) -> out.write(o.toJsonString().getBytes())); + // ------------------------------ + }else if (o.isDirectory() && req.groupping == Groupping.ALL && !req.isDestContent) { + ff = session.putAllAttributes(ff, o.toAttributesMap()); + ff = session.putAttribute(ff, "hdfs.full.tree", o.toJsonString()); + }else { + getLogger().error("Illegal State!"); + session.remove(ff); + return null; + } + + session.transfer(ff, REL_SUCCESS); + return o; + } + + /* + * Returns permissions in readable format like rwxr-xr-x (755) + */ + protected String getPerms(final FsPermission permission) { + + final StringBuilder sb = new StringBuilder(); + for (FsAction action : new FsAction[]{permission.getUserAction(), permission.getGroupAction(), permission.getOtherAction()}) { + if (action.implies(FsAction.READ)) { + sb.append("r"); + } else { + sb.append("-"); + } + + if (action.implies(FsAction.WRITE)) { + sb.append("w"); + } else { + sb.append("-"); + } + + if (action.implies(FsAction.EXECUTE)) { + sb.append("x"); + } else { + sb.append("-"); + } + } + + return sb.toString(); + } + + /* + * Creates internal request object and initialize the fields that won't be changed every call (onTrigger). + * Dynamic fields will be updated per each call separately. + */ + protected HDFSFileInfoRequest buildRequestDetails(ProcessContext context, ProcessSession session, FlowFile ff) { + HDFSFileInfoRequest req = new HDFSFileInfoRequest(); + req.fullPath = context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue(); + req.isRecursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); + + PropertyValue pv = null; + String v = null; + + if (context.getProperty(DIR_FILTER).isSet() && (pv=context.getProperty(DIR_FILTER).evaluateAttributeExpressions(ff))!=null) { + v = pv.getValue(); + req.dirFilter = v == null ? null : Pattern.compile(v); + } + + if (context.getProperty(FILE_FILTER).isSet() && (pv=context.getProperty(FILE_FILTER).evaluateAttributeExpressions(ff))!=null) { + v = pv.getValue(); + req.fileFilter = v == null ? null : Pattern.compile(v); + } + + if (context.getProperty(FILE_EXCLUDE_FILTER).isSet() && (pv=context.getProperty(FILE_EXCLUDE_FILTER).evaluateAttributeExpressions(ff))!=null) { + v = pv.getValue(); + req.fileExcludeFilter = v == null ? null : Pattern.compile(v); + } + + req.isIgnoreDotFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean(); + req.isIgnoreDotDirs = context.getProperty(IGNORE_DOTTED_DIRS).asBoolean(); + + req.groupping = HDFSFileInfoRequest.Groupping.getEnum(context.getProperty(GROUPING).getValue()); + + v = context.getProperty(DESTINATION).getValue(); + if (DESTINATION_CONTENT.getValue().equals(v)) { + req.isDestContent = true; + }else { + req.isDestContent = false; + } + + return req; + } + + /* + * Creates internal request object if not created previously, and updates it with dynamic property every time onTrigger is called. + * Avoids creating regex Patter objects unless their actual value are changed due to evaluation of EL + */ + protected HDFSFileInfoRequest updateRequestDetails(ProcessContext context, ProcessSession session, FlowFile ff) { + + if (req == null) { + return buildRequestDetails(context, session, ff); + } + req.fullPath = context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue(); + + String currValue = null; + String oldValue = null; + + currValue = context.getProperty(DIR_FILTER).evaluateAttributeExpressions(ff).getValue(); + oldValue = req.dirFilter == null ? null : req.dirFilter.toString(); + if (StringUtils.compare(currValue, oldValue) != 0) { + req.dirFilter = currValue == null ? null : Pattern.compile(currValue); + } + + + currValue = context.getProperty(FILE_FILTER).evaluateAttributeExpressions(ff).getValue(); + oldValue = req.fileFilter == null ? null : req.fileFilter.toString(); + if (StringUtils.compare(currValue, oldValue) != 0) { + req.fileFilter = currValue == null ? null : Pattern.compile(currValue); + } + + + currValue = context.getProperty(FILE_EXCLUDE_FILTER).evaluateAttributeExpressions(ff).getValue(); + oldValue = req.fileExcludeFilter == null ? null : req.fileExcludeFilter.toString(); + if (StringUtils.compare(currValue, oldValue) != 0) { + req.fileExcludeFilter = currValue == null ? null : Pattern.compile(currValue); + } + + return req; + } + + /* + * Keeps all request details in single object. + */ + static class HDFSFileInfoRequest{ + enum Groupping { + ALL(GROUP_ALL.getValue()), + DIR(GROUP_PARENT_DIR.getValue()), + NONE(GROUP_NONE.getValue()); + + private String val; + + Groupping(String val){ + this.val = val; + } + + public String toString() { + return this.val; + } + + public static Groupping getEnum(String value) { + for (Groupping v : values()) { + if (v.val.equals(value)) { + return v; + } + } + return null; + } + } + + String fullPath; + boolean isRecursive; + Pattern dirFilter; + Pattern fileFilter; + Pattern fileExcludeFilter; + boolean isIgnoreDotFiles; + boolean isIgnoreDotDirs; + boolean isDestContent; + Groupping groupping; + } + + /* + * Keeps details of HDFS objects. + * This class is based on FileStatus and adds additional feature/properties for count, total size of directories, and subtrees/hierarchy of recursive listings. + */ + class HDFSObjectInfoDetails extends FileStatus{ + + private long countFiles; + private long countDirs = 1; + private long totalLen; + private Collection children = new LinkedList<>(); + private String error; + + HDFSObjectInfoDetails(FileStatus fs) throws IOException{ + super(fs); + } + + public long getCountFiles() { + return countFiles; + } + + public void setCountFiles(long countFiles) { + this.countFiles = countFiles; + } + + public long getCountDirs() { + return countDirs; + } + + public void setCountDirs(long countDirs) { + this.countDirs = countDirs; + } + + public long getTotalLen() { + return totalLen; + } + + public void setTotalLen(long totalLen) { + this.totalLen = totalLen; + } + + public Collection getChildren() { + return children; + } + + public String getError() { + return error; + } + + public void setError(String error) { + this.error = error; + } + + public void setChildren(Collection children) { + this.children = children; + } + + public void addChild(HDFSObjectInfoDetails child) { + this.children.add(child); + } + + public void updateTotals(boolean deepUpdate) { + if (deepUpdate) { + this.countDirs = 1; + this.countFiles = 0; + this.totalLen = 0; + } + + for(HDFSObjectInfoDetails c : children) { + if (c.isSymlink()) { + continue; //do not count symlinks. they either will be counted under their actual directories, or won't be count if actual location is not under provided root for scan. + }else if (c.isDirectory()) { + if (deepUpdate) { + c.updateTotals(deepUpdate); + } + this.totalLen += c.totalLen; + this.countDirs += c.countDirs; + this.countFiles += c.countFiles; + }else if (c.isFile()) { + this.totalLen += c.getLen(); + this.countFiles++; + } + } + + } + + /* + * Since, by definition, FF will keep only attributes for parent/single object, we don't need to recurse the children + */ + public Map toAttributesMap(){ + Map map = new HashMap<>(); + + map.put("hdfs.objectName", this.getPath().getName()); + map.put("hdfs.path", Path.getPathWithoutSchemeAndAuthority(this.getPath().getParent()).toString()); + map.put("hdfs.type", this.isSymlink() ? "link" : (this.isDirectory() ? "directory" : "file")); + map.put("hdfs.owner", this.getOwner()); + map.put("hdfs.group", this.getGroup()); + map.put("hdfs.lastModified", Long.toString(this.getModificationTime())); + map.put("hdfs.length", Long.toString(this.isDirectory() ? this.totalLen : this.getLen())); + map.put("hdfs.replication", Long.toString(this.getReplication())); + if (this.isDirectory()) { + map.put("hdfs.count.files", Long.toString(this.getCountFiles())); + map.put("hdfs.count.dirs", Long.toString(this.getCountDirs())); + } + map.put("hdfs.permissions", getPerms(this.getPermission())); + if (this.error != null) { + map.put("hdfs.status", "Error: " + this.error); + } + + return map; + } + + /* + * The decision to use custom serialization (vs jackson/velocity/gson/etc) is behind the performance. + * This object is pretty simple, with limited number of members of simple types. + */ + public String toJsonString() { + StringBuilder sb = new StringBuilder(); + return toJsonString(sb).toString(); + } + + private StringBuilder toJsonString(StringBuilder sb) { + sb.append("{"); + + appendProperty(sb, "objectName", this.getPath().getName()).append(","); + appendProperty(sb, "path", Path.getPathWithoutSchemeAndAuthority(this.getPath().getParent()).toString()).append(","); + appendProperty(sb, "type", this.isSymlink() ? "link" : (this.isDirectory() ? "directory" : "file")).append(","); + appendProperty(sb, "owner", this.getOwner()).append(","); + appendProperty(sb, "group", this.getGroup()).append(","); + appendProperty(sb, "lastModified", Long.toString(this.getModificationTime())).append(","); + appendProperty(sb, "length", Long.toString(this.isDirectory() ? this.totalLen : this.getLen())).append(","); + appendProperty(sb, "replication", Long.toString(this.getReplication())).append(","); + if (this.isDirectory()) { + appendProperty(sb, "countFiles", Long.toString(this.getCountFiles())).append(","); + appendProperty(sb, "countDirs", Long.toString(this.getCountDirs())).append(","); + } + appendProperty(sb, "permissions", getPerms(this.getPermission())); + + if (this.error != null) { + sb.append(","); + appendProperty(sb, "status", this.error); + } + + if (this.getChildren() != null && this.getChildren().size() > 0) { + sb.append(",\"content\":["); + for (HDFSObjectInfoDetails c : this.getChildren()) { + c.toJsonString(sb).append(","); + } + sb.deleteCharAt(sb.length()-1).append("]"); + } + sb.append("}"); + return sb; + } + + private StringBuilder appendProperty(StringBuilder sb, String name, String value) { + return sb.append("\"").append(name).append("\":\"").append(value == null? "": value).append("\""); + } + } +} diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 920776ab08..6b8e7f1a1b 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -21,3 +21,4 @@ org.apache.nifi.processors.hadoop.ListHDFS org.apache.nifi.processors.hadoop.PutHDFS org.apache.nifi.processors.hadoop.DeleteHDFS org.apache.nifi.processors.hadoop.MoveHDFS +org.apache.nifi.processors.hadoop.GetHDFSFileInfo diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java new file mode 100644 index 0000000000..db0678be12 --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestGetHDFSFileInfo.java @@ -0,0 +1,724 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class TestGetHDFSFileInfo { + + private TestRunner runner; + private GetHDFSFileInfoWithMockedFileSystem proc; + private NiFiProperties mockNiFiProperties; + private KerberosProperties kerberosProperties; + + @Before + public void setup() throws InitializationException { + mockNiFiProperties = mock(NiFiProperties.class); + when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); + kerberosProperties = new KerberosProperties(null); + + proc = new GetHDFSFileInfoWithMockedFileSystem(kerberosProperties); + runner = TestRunners.newTestRunner(proc); + + runner.setProperty(GetHDFSFileInfo.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml"); + } + + @Test + public void testNoRunOnIncomingConnectionExists() throws InterruptedException { + + setFileSystemBasicTree(proc.fileSystem); + + runner.setIncomingConnection(true); + runner.setProperty(GetHDFSFileInfo.FULL_PATH, "${literal('/some/home/mydir'):substring(0,15)}"); + runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true"); + runner.setProperty(GetHDFSFileInfo.DESTINATION, GetHDFSFileInfo.DESTINATION_CONTENT); + + runner.run(); + + runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_SUCCESS, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_FAILURE, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_NOT_FOUND, 0); + } + + @Test + public void testRunOnScheduleNoConnections() throws InterruptedException { + + setFileSystemBasicTree(proc.fileSystem); + + runner.setIncomingConnection(false); + runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir"); + runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "false"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true"); + runner.setProperty(GetHDFSFileInfo.DESTINATION, GetHDFSFileInfo.DESTINATION_CONTENT); + + runner.run(); + + runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_SUCCESS, 1); + runner.assertTransferCount(GetHDFSFileInfo.REL_FAILURE, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_NOT_FOUND, 0); + } + + @Test + public void testValidELFunction() throws InterruptedException { + + setFileSystemBasicTree(proc.fileSystem); + + runner.setIncomingConnection(true); + runner.setProperty(GetHDFSFileInfo.FULL_PATH, "${literal('/some/home/mydir'):substring(0,16)}"); + runner.setProperty(GetHDFSFileInfo.DIR_FILTER, "${literal('_^(dir.*)$_'):substring(1,10)}"); + runner.setProperty(GetHDFSFileInfo.FILE_FILTER, "${literal('_^(.*)$_'):substring(1,7)}"); + runner.setProperty(GetHDFSFileInfo.FILE_EXCLUDE_FILTER, "${literal('_^(none.*)$_'):substring(1,11)}"); + runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "false"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true"); + runner.setProperty(GetHDFSFileInfo.DESTINATION, GetHDFSFileInfo.DESTINATION_CONTENT); + runner.enqueue("foo", new HashMap()); + + runner.run(); + + runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 1); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(GetHDFSFileInfo.REL_ORIGINAL).get(0); + ProcessContext context = runner.getProcessContext(); + + assertEquals(context.getProperty(GetHDFSFileInfo.FULL_PATH).evaluateAttributeExpressions(mff).getValue(), "/some/home/mydir"); + assertEquals(context.getProperty(GetHDFSFileInfo.DIR_FILTER).evaluateAttributeExpressions(mff).getValue(), "^(dir.*)$"); + assertEquals(context.getProperty(GetHDFSFileInfo.FILE_FILTER).evaluateAttributeExpressions(mff).getValue(), "^(.*)$"); + assertEquals(context.getProperty(GetHDFSFileInfo.FILE_EXCLUDE_FILTER).evaluateAttributeExpressions(mff).getValue(), "^(none.*)$"); + } + + @Test + public void testRunWithConnections() throws InterruptedException { + + setFileSystemBasicTree(proc.fileSystem); + + Map attributes = Maps.newHashMap(); + attributes.put("input.dir", "/some/home/mydir"); + + runner.setIncomingConnection(true); + runner.setProperty(GetHDFSFileInfo.FULL_PATH, "${input.dir}"); + runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "false"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true"); + runner.setProperty(GetHDFSFileInfo.DESTINATION, GetHDFSFileInfo.DESTINATION_CONTENT); + + runner.enqueue("foo", attributes); + runner.run(); + + runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 1); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(GetHDFSFileInfo.REL_ORIGINAL).get(0); + ProcessContext context = runner.getProcessContext(); + + assertEquals(context.getProperty(GetHDFSFileInfo.FULL_PATH).evaluateAttributeExpressions(mff).getValue(), "/some/home/mydir"); + } + + @Test + public void testRunWithIOException() throws InterruptedException { + + setFileSystemBasicTree(proc.fileSystem); + proc.fileSystem.addFileStatus(proc.fileSystem.newDir("/some/home/mydir"), proc.fileSystem.newFile("/some/home/mydir/exception_java.io.InterruptedIOException")); + + runner.setIncomingConnection(false); + runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir/exception_java.io.InterruptedIOException"); + runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "false"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true"); + runner.setProperty(GetHDFSFileInfo.DESTINATION, GetHDFSFileInfo.DESTINATION_CONTENT); + + runner.run(); + + runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_SUCCESS, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_FAILURE, 1); + runner.assertTransferCount(GetHDFSFileInfo.REL_NOT_FOUND, 0); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(GetHDFSFileInfo.REL_FAILURE).get(0); + mff.assertAttributeEquals("hdfs.status", "Failed due to: java.io.InterruptedIOException"); + } + + @Test + public void testRunWithPermissionsExceptionAttributes() throws InterruptedException { + + setFileSystemBasicTree(proc.fileSystem); + proc.fileSystem.addFileStatus(proc.fileSystem.newDir("/some/home/mydir/dir1"), proc.fileSystem.newDir("/some/home/mydir/dir1/list_exception_java.io.InterruptedIOException")); + + runner.setIncomingConnection(false); + runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir"); + runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true"); + runner.setProperty(GetHDFSFileInfo.DESTINATION, GetHDFSFileInfo.DESTINATION_ATTRIBUTES); + runner.setProperty(GetHDFSFileInfo.GROUPING, GetHDFSFileInfo.GROUP_PARENT_DIR); + + runner.run(); + + runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_SUCCESS, 6); + runner.assertTransferCount(GetHDFSFileInfo.REL_FAILURE, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_NOT_FOUND, 0); + } + + @Test + public void testRunWithPermissionsExceptionContent() throws Exception { + + setFileSystemBasicTree(proc.fileSystem); + proc.fileSystem.addFileStatus(proc.fileSystem.newDir("/some/home/mydir/dir1"), proc.fileSystem.newDir("/some/home/mydir/dir1/list_exception_java.io.InterruptedIOException")); + + runner.setIncomingConnection(false); + runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir"); + runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true"); + runner.setProperty(GetHDFSFileInfo.DESTINATION, GetHDFSFileInfo.DESTINATION_CONTENT); + runner.setProperty(GetHDFSFileInfo.GROUPING, GetHDFSFileInfo.GROUP_ALL); + + runner.run(); + + runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_SUCCESS, 1); + runner.assertTransferCount(GetHDFSFileInfo.REL_FAILURE, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_NOT_FOUND, 0); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(GetHDFSFileInfo.REL_SUCCESS).get(0); + mff.assertContentEquals(Paths.get("src/test/resources/TestGetHDFSFileInfo/testRunWithPermissionsExceptionContent.json")); + } + + @Test + public void testObjectNotFound() throws InterruptedException { + + setFileSystemBasicTree(proc.fileSystem); + + runner.setIncomingConnection(false); + runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir/ObjectThatDoesNotExist"); + runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true"); + runner.setProperty(GetHDFSFileInfo.DESTINATION, GetHDFSFileInfo.DESTINATION_CONTENT); + + runner.run(); + + runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_SUCCESS, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_FAILURE, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_NOT_FOUND, 1); + } + + @Test + public void testRecursive() throws InterruptedException { + + setFileSystemBasicTree(proc.fileSystem); + + runner.setIncomingConnection(false); + runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir"); + runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true"); + runner.setProperty(GetHDFSFileInfo.DESTINATION, GetHDFSFileInfo.DESTINATION_CONTENT); + + runner.run(); + + runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_SUCCESS, 1); + runner.assertTransferCount(GetHDFSFileInfo.REL_FAILURE, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_NOT_FOUND, 0); + } + + @Test + public void testRecursiveGroupAllToAttributes() throws Exception { + + setFileSystemBasicTree(proc.fileSystem); + + runner.setIncomingConnection(false); + runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir"); + runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true"); + runner.setProperty(GetHDFSFileInfo.DESTINATION, GetHDFSFileInfo.DESTINATION_ATTRIBUTES); + runner.setProperty(GetHDFSFileInfo.GROUPING, GetHDFSFileInfo.GROUP_ALL); + + runner.run(); + + runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_SUCCESS, 1); + runner.assertTransferCount(GetHDFSFileInfo.REL_FAILURE, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_NOT_FOUND, 0); + + final MockFlowFile mff = runner.getFlowFilesForRelationship(GetHDFSFileInfo.REL_SUCCESS).get(0); + mff.assertAttributeEquals("hdfs.objectName", "mydir"); + mff.assertAttributeEquals("hdfs.path", "/some/home"); + mff.assertAttributeEquals("hdfs.type", "directory"); + mff.assertAttributeEquals("hdfs.owner", "owner"); + mff.assertAttributeEquals("hdfs.group", "group"); + mff.assertAttributeEquals("hdfs.lastModified", ""+1523456000000L); + mff.assertAttributeEquals("hdfs.length", ""+500); + mff.assertAttributeEquals("hdfs.count.files", ""+5); + mff.assertAttributeEquals("hdfs.count.dirs", ""+10); + mff.assertAttributeEquals("hdfs.replication", ""+3); + mff.assertAttributeEquals("hdfs.permissions", "rwxr-xr-x"); + mff.assertAttributeNotExists("hdfs.status"); + + final String expected = new String(Files.readAllBytes(Paths.get("src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupAllToAttributes.json"))); + mff.assertAttributeEquals("hdfs.full.tree", expected); + } + + @Test + public void testRecursiveGroupNoneToAttributes() throws InterruptedException { + + setFileSystemBasicTree(proc.fileSystem); + + runner.setIncomingConnection(false); + runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir"); + runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true"); + runner.setProperty(GetHDFSFileInfo.DESTINATION, GetHDFSFileInfo.DESTINATION_ATTRIBUTES); + runner.setProperty(GetHDFSFileInfo.GROUPING, GetHDFSFileInfo.GROUP_NONE); + + runner.run(); + + runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_SUCCESS, 7); + runner.assertTransferCount(GetHDFSFileInfo.REL_FAILURE, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_NOT_FOUND, 0); + + int matchCount = 0; + for (final MockFlowFile mff : runner.getFlowFilesForRelationship(GetHDFSFileInfo.REL_SUCCESS)) { + if (mff.getAttribute("hdfs.objectName").equals("mydir")) { + matchCount++; + mff.assertAttributeEquals("hdfs.path", "/some/home"); + mff.assertAttributeEquals("hdfs.type", "directory"); + mff.assertAttributeEquals("hdfs.owner", "owner"); + mff.assertAttributeEquals("hdfs.group", "group"); + mff.assertAttributeEquals("hdfs.lastModified", ""+1523456000000L); + mff.assertAttributeEquals("hdfs.length", ""+500); + mff.assertAttributeEquals("hdfs.count.files", ""+5); + mff.assertAttributeEquals("hdfs.count.dirs", ""+10); + mff.assertAttributeEquals("hdfs.replication", ""+3); + mff.assertAttributeEquals("hdfs.permissions", "rwxr-xr-x"); + mff.assertAttributeNotExists("hdfs.status"); + }else if (mff.getAttribute("hdfs.objectName").equals("dir1")) { + matchCount++; + mff.assertAttributeEquals("hdfs.path", "/some/home/mydir"); + mff.assertAttributeEquals("hdfs.type", "directory"); + mff.assertAttributeEquals("hdfs.owner", "owner"); + mff.assertAttributeEquals("hdfs.group", "group"); + mff.assertAttributeEquals("hdfs.lastModified", ""+1523456000000L); + mff.assertAttributeEquals("hdfs.length", ""+200); + mff.assertAttributeEquals("hdfs.count.files", ""+2); + mff.assertAttributeEquals("hdfs.count.dirs", ""+3); + mff.assertAttributeEquals("hdfs.replication", ""+3); + mff.assertAttributeEquals("hdfs.permissions", "rwxr-xr-x"); + mff.assertAttributeNotExists("hdfs.status"); + }else if (mff.getAttribute("hdfs.objectName").equals("dir2")) { + matchCount++; + mff.assertAttributeEquals("hdfs.path", "/some/home/mydir"); + mff.assertAttributeEquals("hdfs.type", "directory"); + mff.assertAttributeEquals("hdfs.owner", "owner"); + mff.assertAttributeEquals("hdfs.group", "group"); + mff.assertAttributeEquals("hdfs.lastModified", ""+1523456000000L); + mff.assertAttributeEquals("hdfs.length", ""+200); + mff.assertAttributeEquals("hdfs.count.files", ""+2); + mff.assertAttributeEquals("hdfs.count.dirs", ""+3); + mff.assertAttributeEquals("hdfs.replication", ""+3); + mff.assertAttributeEquals("hdfs.permissions", "rwxr-xr-x"); + mff.assertAttributeNotExists("hdfs.status"); + }else if (mff.getAttribute("hdfs.objectName").equals("regDir")) { + matchCount++; + mff.assertAttributeEquals("hdfs.path", "/some/home/mydir/dir1"); + mff.assertAttributeEquals("hdfs.type", "directory"); + mff.assertAttributeEquals("hdfs.owner", "owner"); + mff.assertAttributeEquals("hdfs.group", "group"); + mff.assertAttributeEquals("hdfs.lastModified", ""+1523456000000L); + mff.assertAttributeEquals("hdfs.length", ""+0); + mff.assertAttributeEquals("hdfs.count.files", ""+0); + mff.assertAttributeEquals("hdfs.count.dirs", ""+1); + mff.assertAttributeEquals("hdfs.replication", ""+3); + mff.assertAttributeEquals("hdfs.permissions", "rwxr-xr-x"); + mff.assertAttributeNotExists("hdfs.status"); + }else if (mff.getAttribute("hdfs.objectName").equals("regDir2")) { + matchCount++; + mff.assertAttributeEquals("hdfs.path", "/some/home/mydir/dir2"); + mff.assertAttributeEquals("hdfs.type", "directory"); + mff.assertAttributeEquals("hdfs.owner", "owner"); + mff.assertAttributeEquals("hdfs.group", "group"); + mff.assertAttributeEquals("hdfs.lastModified", ""+1523456000000L); + mff.assertAttributeEquals("hdfs.length", ""+0); + mff.assertAttributeEquals("hdfs.count.files", ""+0); + mff.assertAttributeEquals("hdfs.count.dirs", ""+1); + mff.assertAttributeEquals("hdfs.replication", ""+3); + mff.assertAttributeEquals("hdfs.permissions", "rwxr-xr-x"); + mff.assertAttributeNotExists("hdfs.status"); + }else if (mff.getAttribute("hdfs.objectName").equals("regFile")) { + matchCount++; + mff.assertAttributeEquals("hdfs.path", "/some/home/mydir/dir1"); + mff.assertAttributeEquals("hdfs.type", "file"); + mff.assertAttributeEquals("hdfs.owner", "owner"); + mff.assertAttributeEquals("hdfs.group", "group"); + mff.assertAttributeEquals("hdfs.lastModified", ""+1523456000000L); + mff.assertAttributeEquals("hdfs.length", ""+100); + mff.assertAttributeNotExists("hdfs.count.files"); + mff.assertAttributeNotExists("hdfs.count.dirs"); + mff.assertAttributeEquals("hdfs.replication", ""+3); + mff.assertAttributeEquals("hdfs.permissions", "rw-r--r--"); + mff.assertAttributeNotExists("hdfs.status"); + }else if (mff.getAttribute("hdfs.objectName").equals("regFile2")) { + matchCount++; + mff.assertAttributeEquals("hdfs.path", "/some/home/mydir/dir2"); + mff.assertAttributeEquals("hdfs.type", "file"); + mff.assertAttributeEquals("hdfs.owner", "owner"); + mff.assertAttributeEquals("hdfs.group", "group"); + mff.assertAttributeEquals("hdfs.lastModified", ""+1523456000000L); + mff.assertAttributeEquals("hdfs.length", ""+100); + mff.assertAttributeNotExists("hdfs.count.files"); + mff.assertAttributeNotExists("hdfs.count.dirs"); + mff.assertAttributeEquals("hdfs.replication", ""+3); + mff.assertAttributeEquals("hdfs.permissions", "rw-r--r--"); + mff.assertAttributeNotExists("hdfs.status"); + }else { + runner.assertNotValid(); + } + } + Assert.assertEquals(matchCount, 7); + } + + @Test + public void testRecursiveGroupDirToAttributes() throws Exception { + + setFileSystemBasicTree(proc.fileSystem); + + runner.setIncomingConnection(false); + runner.setProperty(GetHDFSFileInfo.FULL_PATH, "/some/home/mydir"); + runner.setProperty(GetHDFSFileInfo.RECURSE_SUBDIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_DIRS, "true"); + runner.setProperty(GetHDFSFileInfo.IGNORE_DOTTED_FILES, "true"); + runner.setProperty(GetHDFSFileInfo.DESTINATION, GetHDFSFileInfo.DESTINATION_ATTRIBUTES); + runner.setProperty(GetHDFSFileInfo.GROUPING, GetHDFSFileInfo.GROUP_PARENT_DIR); + + runner.run(); + + runner.assertTransferCount(GetHDFSFileInfo.REL_ORIGINAL, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_SUCCESS, 5); + runner.assertTransferCount(GetHDFSFileInfo.REL_FAILURE, 0); + runner.assertTransferCount(GetHDFSFileInfo.REL_NOT_FOUND, 0); + + int matchCount = 0; + for (final MockFlowFile mff : runner.getFlowFilesForRelationship(GetHDFSFileInfo.REL_SUCCESS)) { + if (mff.getAttribute("hdfs.objectName").equals("mydir")) { + matchCount++; + mff.assertAttributeEquals("hdfs.path", "/some/home"); + mff.assertAttributeEquals("hdfs.type", "directory"); + mff.assertAttributeEquals("hdfs.owner", "owner"); + mff.assertAttributeEquals("hdfs.group", "group"); + mff.assertAttributeEquals("hdfs.lastModified", ""+1523456000000L); + mff.assertAttributeEquals("hdfs.length", ""+500); + mff.assertAttributeEquals("hdfs.count.files", ""+5); + mff.assertAttributeEquals("hdfs.count.dirs", ""+10); + mff.assertAttributeEquals("hdfs.replication", ""+3); + mff.assertAttributeEquals("hdfs.permissions", "rwxr-xr-x"); + mff.assertAttributeNotExists("hdfs.status"); + final String expected = new String(Files.readAllBytes(Paths.get("src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-mydir.json"))); + mff.assertAttributeEquals("hdfs.full.tree", expected); + }else if (mff.getAttribute("hdfs.objectName").equals("dir1")) { + matchCount++; + mff.assertAttributeEquals("hdfs.path", "/some/home/mydir"); + mff.assertAttributeEquals("hdfs.type", "directory"); + mff.assertAttributeEquals("hdfs.owner", "owner"); + mff.assertAttributeEquals("hdfs.group", "group"); + mff.assertAttributeEquals("hdfs.lastModified", ""+1523456000000L); + mff.assertAttributeEquals("hdfs.length", ""+200); + mff.assertAttributeEquals("hdfs.count.files", ""+2); + mff.assertAttributeEquals("hdfs.count.dirs", ""+3); + mff.assertAttributeEquals("hdfs.replication", ""+3); + mff.assertAttributeEquals("hdfs.permissions", "rwxr-xr-x"); + mff.assertAttributeNotExists("hdfs.status"); + final String expected = new String(Files.readAllBytes(Paths.get("src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-dir1.json"))); + mff.assertAttributeEquals("hdfs.full.tree", expected); + }else if (mff.getAttribute("hdfs.objectName").equals("dir2")) { + matchCount++; + mff.assertAttributeEquals("hdfs.path", "/some/home/mydir"); + mff.assertAttributeEquals("hdfs.type", "directory"); + mff.assertAttributeEquals("hdfs.owner", "owner"); + mff.assertAttributeEquals("hdfs.group", "group"); + mff.assertAttributeEquals("hdfs.lastModified", ""+1523456000000L); + mff.assertAttributeEquals("hdfs.length", ""+200); + mff.assertAttributeEquals("hdfs.count.files", ""+2); + mff.assertAttributeEquals("hdfs.count.dirs", ""+3); + mff.assertAttributeEquals("hdfs.replication", ""+3); + mff.assertAttributeEquals("hdfs.permissions", "rwxr-xr-x"); + mff.assertAttributeNotExists("hdfs.status"); + final String expected = new String(Files.readAllBytes(Paths.get("src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-dir2.json"))); + mff.assertAttributeEquals("hdfs.full.tree", expected); + }else if (mff.getAttribute("hdfs.objectName").equals("regDir")) { + matchCount++; + mff.assertAttributeEquals("hdfs.path", "/some/home/mydir/dir1"); + mff.assertAttributeEquals("hdfs.type", "directory"); + mff.assertAttributeEquals("hdfs.owner", "owner"); + mff.assertAttributeEquals("hdfs.group", "group"); + mff.assertAttributeEquals("hdfs.lastModified", ""+1523456000000L); + mff.assertAttributeEquals("hdfs.length", ""+0); + mff.assertAttributeEquals("hdfs.count.files", ""+0); + mff.assertAttributeEquals("hdfs.count.dirs", ""+1); + mff.assertAttributeEquals("hdfs.replication", ""+3); + mff.assertAttributeEquals("hdfs.permissions", "rwxr-xr-x"); + mff.assertAttributeNotExists("hdfs.status"); + final String expected = new String(Files.readAllBytes(Paths.get("src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-regDir.json"))); + mff.assertAttributeEquals("hdfs.full.tree", expected); + }else if (mff.getAttribute("hdfs.objectName").equals("regDir2")) { + matchCount++; + mff.assertAttributeEquals("hdfs.path", "/some/home/mydir/dir2"); + mff.assertAttributeEquals("hdfs.type", "directory"); + mff.assertAttributeEquals("hdfs.owner", "owner"); + mff.assertAttributeEquals("hdfs.group", "group"); + mff.assertAttributeEquals("hdfs.lastModified", ""+1523456000000L); + mff.assertAttributeEquals("hdfs.length", ""+0); + mff.assertAttributeEquals("hdfs.count.files", ""+0); + mff.assertAttributeEquals("hdfs.count.dirs", ""+1); + mff.assertAttributeEquals("hdfs.replication", ""+3); + mff.assertAttributeEquals("hdfs.permissions", "rwxr-xr-x"); + mff.assertAttributeNotExists("hdfs.status"); + final String expected = new String(Files.readAllBytes(Paths.get("src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-regDir2.json"))); + mff.assertAttributeEquals("hdfs.full.tree", expected); + }else { + runner.assertNotValid(); + } + } + Assert.assertEquals(matchCount, 5); + } + + /* + * For all basic tests, this provides a structure of files in dirs: + * Total number of dirs: 9 (1 root, 4 dotted) + * Total number of files: 4 (2 dotted) + */ + protected void setFileSystemBasicTree(final MockFileSystem fs) { + fs.addFileStatus(fs.newDir("/some/home/mydir"), fs.newDir("/some/home/mydir/dir1")); + fs.addFileStatus(fs.newDir("/some/home/mydir"), fs.newDir("/some/home/mydir/dir2")); + fs.addFileStatus(fs.newDir("/some/home/mydir"), fs.newDir("/some/home/mydir/.dir3")); + fs.addFileStatus(fs.newDir("/some/home/mydir"), fs.newDir("/some/home/mydir/.dir4")); + + fs.addFileStatus(fs.newDir("/some/home/mydir/dir1"), fs.newFile("/some/home/mydir/dir1/.dotFile")); + fs.addFileStatus(fs.newDir("/some/home/mydir/dir1"), fs.newDir("/some/home/mydir/dir1/.dotDir")); + fs.addFileStatus(fs.newDir("/some/home/mydir/dir1"), fs.newFile("/some/home/mydir/dir1/regFile")); + fs.addFileStatus(fs.newDir("/some/home/mydir/dir1"), fs.newDir("/some/home/mydir/dir1/regDir")); + + fs.addFileStatus(fs.newDir("/some/home/mydir/dir2"), fs.newFile("/some/home/mydir/dir2/.dotFile2")); + fs.addFileStatus(fs.newDir("/some/home/mydir/dir2"), fs.newDir("/some/home/mydir/dir2/.dotDir2")); + fs.addFileStatus(fs.newDir("/some/home/mydir/dir2"), fs.newFile("/some/home/mydir/dir2/regFile2")); + fs.addFileStatus(fs.newDir("/some/home/mydir/dir2"), fs.newDir("/some/home/mydir/dir2/regDir2")); + + fs.addFileStatus(fs.newDir("/some/home/mydir/.dir3"), fs.newFile("/some/home/mydir/.dir3/regFile3")); + fs.addFileStatus(fs.newDir("/some/home/mydir/.dir3"), fs.newDir("/some/home/mydir/.dir3/regDir3")); + } + + static FsPermission perms(short p) { + return new FsPermission(p); + } + + + private class GetHDFSFileInfoWithMockedFileSystem extends GetHDFSFileInfo { + private final MockFileSystem fileSystem = new MockFileSystem(); + private final KerberosProperties testKerberosProps; + + public GetHDFSFileInfoWithMockedFileSystem(KerberosProperties kerberosProperties) { + this.testKerberosProps = kerberosProperties; + } + + @Override + protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { + return testKerberosProps; + } + + @Override + protected FileSystem getFileSystem() { + return fileSystem; + } + + @Override + protected FileSystem getFileSystem(final Configuration config) throws IOException { + return fileSystem; + } + } + + private class MockFileSystem extends FileSystem { + private final Map> fileStatuses = new HashMap<>(); + private final Map pathToStatus = new HashMap<>(); + + public void addFileStatus(final FileStatus parent, final FileStatus child) { + Set children = fileStatuses.get(parent.getPath()); + if (children == null) { + children = new HashSet<>(); + fileStatuses.put(parent.getPath(), children); + } + if (child != null) { + children.add(child); + if (child.isDirectory() && !fileStatuses.containsKey(child.getPath())) { + fileStatuses.put(child.getPath(), new HashSet()); + } + } + + pathToStatus.put(parent.getPath(), parent); + pathToStatus.put(child.getPath(), child); + } + + @Override + public long getDefaultBlockSize() { + return 1024L; + } + + @Override + public short getDefaultReplication() { + return 1; + } + + @Override + public URI getUri() { + return null; + } + + @Override + public FSDataInputStream open(final Path f, final int bufferSize) throws IOException { + return null; + } + + @Override + public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, + final long blockSize, final Progressable progress) throws IOException { + return null; + } + + @Override + public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException { + return null; + } + + @Override + public boolean rename(final Path src, final Path dst) throws IOException { + return false; + } + + @Override + public boolean delete(final Path f, final boolean recursive) throws IOException { + return false; + } + + @Override + public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException { + if (!fileStatuses.containsKey(f)) { + throw new FileNotFoundException(); + } + if (f.getName().startsWith("list_exception_")) { + String clzName = f.getName().substring("list_exception_".length(), f.getName().length()); + IOException exception = null; + try { + exception = (IOException)Class.forName(clzName).newInstance(); + } catch (Throwable t) { + throw new RuntimeException(t); + } + throw exception; + } + final Set statuses = fileStatuses.get(f); + if (statuses == null) { + return new FileStatus[0]; + } + + for (FileStatus s : statuses) { + getFileStatus(s.getPath()); //support exception handling only. + } + + return statuses.toArray(new FileStatus[statuses.size()]); + } + + @Override + public void setWorkingDirectory(final Path new_dir) { + + } + + @Override + public Path getWorkingDirectory() { + return new Path(new File(".").getAbsolutePath()); + } + + @Override + public boolean mkdirs(final Path f, final FsPermission permission) throws IOException { + return false; + } + + @Override + public FileStatus getFileStatus(final Path f) throws IOException { + if (f!=null && f.getName().startsWith("exception_")) { + String clzName = f.getName().substring("exception_".length(), f.getName().length()); + IOException exception = null; + try { + exception = (IOException)Class.forName(clzName).newInstance(); + } catch (Throwable t) { + throw new RuntimeException(t); + } + throw exception; + } + final FileStatus fileStatus = pathToStatus.get(f); + if (fileStatus == null) throw new FileNotFoundException(); + return fileStatus; + } + + public FileStatus newFile(String p) { + return new FileStatus(100L, false, 3, 128*1024*1024, 1523456000000L, 1523457000000L, perms((short)0644), "owner", "group", new Path(p)); + } + public FileStatus newDir(String p) { + return new FileStatus(1L, true, 3, 128*1024*1024, 1523456000000L, 1523457000000L, perms((short)0755), "owner", "group", new Path(p)); + } + } +} diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupAllToAttributes.json b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupAllToAttributes.json new file mode 100644 index 0000000000..096f244d5f --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupAllToAttributes.json @@ -0,0 +1 @@ +{"objectName":"mydir","path":"/some/home","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"500","replication":"3","countFiles":"5","countDirs":"10","permissions":"rwxr-xr-x","content":[{"objectName":"dir1","path":"/some/home/mydir","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"200","replication":"3","countFiles":"2","countDirs":"3","permissions":"rwxr-xr-x","content":[{"objectName":"regFile","path":"/some/home/mydir/dir1","type":"file","owner":"owner","group":"group","lastModified":"1523456000000","length":"100","replication":"3","permissions":"rw-r--r--"},{"objectName":"regDir","path":"/some/home/mydir/dir1","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"0","replication":"3","countFiles":"0","countDirs":"1","permissions":"rwxr-xr-x"}]},{"objectName":"dir2","path":"/some/home/mydir","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"200","replication":"3","countFiles":"2","countDirs":"3","permissions":"rwxr-xr-x","content":[{"objectName":"regDir2","path":"/some/home/mydir/dir2","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"0","replication":"3","countFiles":"0","countDirs":"1","permissions":"rwxr-xr-x"},{"objectName":"regFile2","path":"/some/home/mydir/dir2","type":"file","owner":"owner","group":"group","lastModified":"1523456000000","length":"100","replication":"3","permissions":"rw-r--r--"}]}]} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-dir1.json b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-dir1.json new file mode 100644 index 0000000000..2eae39594d --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-dir1.json @@ -0,0 +1 @@ +{"objectName":"dir1","path":"/some/home/mydir","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"200","replication":"3","countFiles":"2","countDirs":"3","permissions":"rwxr-xr-x","content":[{"objectName":"regFile","path":"/some/home/mydir/dir1","type":"file","owner":"owner","group":"group","lastModified":"1523456000000","length":"100","replication":"3","permissions":"rw-r--r--"},{"objectName":"regDir","path":"/some/home/mydir/dir1","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"0","replication":"3","countFiles":"0","countDirs":"1","permissions":"rwxr-xr-x"}]} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-dir2.json b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-dir2.json new file mode 100644 index 0000000000..ed9628bfad --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-dir2.json @@ -0,0 +1 @@ +{"objectName":"dir2","path":"/some/home/mydir","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"200","replication":"3","countFiles":"2","countDirs":"3","permissions":"rwxr-xr-x","content":[{"objectName":"regDir2","path":"/some/home/mydir/dir2","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"0","replication":"3","countFiles":"0","countDirs":"1","permissions":"rwxr-xr-x"},{"objectName":"regFile2","path":"/some/home/mydir/dir2","type":"file","owner":"owner","group":"group","lastModified":"1523456000000","length":"100","replication":"3","permissions":"rw-r--r--"}]} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-mydir.json b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-mydir.json new file mode 100644 index 0000000000..e1e7d63bb5 --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-mydir.json @@ -0,0 +1 @@ +{"objectName":"mydir","path":"/some/home","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"500","replication":"3","countFiles":"5","countDirs":"10","permissions":"rwxr-xr-x","content":[{"objectName":"dir1","path":"/some/home/mydir","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"200","replication":"3","countFiles":"2","countDirs":"3","permissions":"rwxr-xr-x"},{"objectName":"dir2","path":"/some/home/mydir","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"200","replication":"3","countFiles":"2","countDirs":"3","permissions":"rwxr-xr-x"}]} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-regDir.json b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-regDir.json new file mode 100644 index 0000000000..b680858565 --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-regDir.json @@ -0,0 +1 @@ +{"objectName":"regDir","path":"/some/home/mydir/dir1","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"0","replication":"3","countFiles":"0","countDirs":"1","permissions":"rwxr-xr-x"} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-regDir2.json b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-regDir2.json new file mode 100644 index 0000000000..cfdec7cd80 --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRecursiveGroupDirToAttributes-regDir2.json @@ -0,0 +1 @@ +{"objectName":"regDir2","path":"/some/home/mydir/dir2","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"0","replication":"3","countFiles":"0","countDirs":"1","permissions":"rwxr-xr-x"} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRunWithPermissionsExceptionContent.json b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRunWithPermissionsExceptionContent.json new file mode 100644 index 0000000000..b21cf043fd --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/TestGetHDFSFileInfo/testRunWithPermissionsExceptionContent.json @@ -0,0 +1 @@ +{"objectName":"mydir","path":"/some/home","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"500","replication":"3","countFiles":"5","countDirs":"11","permissions":"rwxr-xr-x","content":[{"objectName":"dir1","path":"/some/home/mydir","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"200","replication":"3","countFiles":"2","countDirs":"4","permissions":"rwxr-xr-x","content":[{"objectName":"list_exception_java.io.InterruptedIOException","path":"/some/home/mydir/dir1","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"0","replication":"3","countFiles":"0","countDirs":"1","permissions":"rwxr-xr-x","status":"Couldn't list directory: java.io.InterruptedIOException"},{"objectName":"regFile","path":"/some/home/mydir/dir1","type":"file","owner":"owner","group":"group","lastModified":"1523456000000","length":"100","replication":"3","permissions":"rw-r--r--"},{"objectName":"regDir","path":"/some/home/mydir/dir1","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"0","replication":"3","countFiles":"0","countDirs":"1","permissions":"rwxr-xr-x"}]},{"objectName":"dir2","path":"/some/home/mydir","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"200","replication":"3","countFiles":"2","countDirs":"3","permissions":"rwxr-xr-x","content":[{"objectName":"regDir2","path":"/some/home/mydir/dir2","type":"directory","owner":"owner","group":"group","lastModified":"1523456000000","length":"0","replication":"3","countFiles":"0","countDirs":"1","permissions":"rwxr-xr-x"},{"objectName":"regFile2","path":"/some/home/mydir/dir2","type":"file","owner":"owner","group":"group","lastModified":"1523456000000","length":"100","replication":"3","permissions":"rw-r--r--"}]}]} \ No newline at end of file