NIFI-6113 Refactoring GetHDFSFileInfo to remove instance level variable to make it stateless.

This closes #4837.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Stan Antyufeev 2021-02-22 18:46:09 -06:00 committed by Peter Turcsanyi
parent 2dedd8bf0f
commit 2322b2cddf
1 changed files with 217 additions and 207 deletions

View File

@ -25,11 +25,9 @@ import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -57,7 +55,10 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Groupping;
import static org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Grouping.ALL;
import static org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Grouping.DIR;
import static org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Grouping.NONE;
@InputRequirement(Requirement.INPUT_ALLOWED) @InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"hadoop", "HCFS", "HDFS", "get", "list", "ingest", "source", "filesystem"}) @Tags({"hadoop", "HCFS", "HDFS", "get", "list", "ingest", "source", "filesystem"})
@ -65,29 +66,29 @@ import org.apache.nifi.processors.hadoop.GetHDFSFileInfo.HDFSFileInfoRequest.Gro
+ "This processor creates a FlowFile(s) that represents the HDFS file/dir with relevant information. " + "This processor creates a FlowFile(s) that represents the HDFS file/dir with relevant information. "
+ "Main purpose of this processor to provide functionality similar to HDFS Client, i.e. count, du, ls, test, etc. " + "Main purpose of this processor to provide functionality similar to HDFS Client, i.e. count, du, ls, test, etc. "
+ "Unlike ListHDFS, this processor is stateless, supports incoming connections and provides information on a dir level. " + "Unlike ListHDFS, this processor is stateless, supports incoming connections and provides information on a dir level. "
) )
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute="hdfs.objectName", description="The name of the file/dir found on HDFS."), @WritesAttribute(attribute = "hdfs.objectName", description = "The name of the file/dir found on HDFS."),
@WritesAttribute(attribute="hdfs.path", description="The path is set to the absolute path of the object's parent directory on HDFS. " @WritesAttribute(attribute = "hdfs.path", description = "The path is set to the absolute path of the object's parent directory on HDFS. "
+ "For example, if an object is a directory 'foo', under directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path' will be '/bar'"), + "For example, if an object is a directory 'foo', under directory '/bar' then 'hdfs.objectName' will have value 'foo', and 'hdfs.path' will be '/bar'"),
@WritesAttribute(attribute="hdfs.type", description="The type of an object. Possible values: directory, file, link"), @WritesAttribute(attribute = "hdfs.type", description = "The type of an object. Possible values: directory, file, link"),
@WritesAttribute(attribute="hdfs.owner", description="The user that owns the object in HDFS"), @WritesAttribute(attribute = "hdfs.owner", description = "The user that owns the object in HDFS"),
@WritesAttribute(attribute="hdfs.group", description="The group that owns the object in HDFS"), @WritesAttribute(attribute = "hdfs.group", description = "The group that owns the object in HDFS"),
@WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the object in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"), @WritesAttribute(attribute = "hdfs.lastModified", description = "The timestamp of when the object in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
@WritesAttribute(attribute="hdfs.length", description="" @WritesAttribute(attribute = "hdfs.length", description = ""
+ "In case of files: The number of bytes in the file in HDFS. " + "In case of files: The number of bytes in the file in HDFS. "
+ "In case of dirs: Retuns storage space consumed by directory. " + "In case of dirs: Retuns storage space consumed by directory. "
+ ""), + ""),
@WritesAttribute(attribute="hdfs.count.files", description="In case of type='directory' will represent total count of files under this dir. " @WritesAttribute(attribute = "hdfs.count.files", description = "In case of type='directory' will represent total count of files under this dir. "
+ "Won't be populated to other types of HDFS objects. "), + "Won't be populated to other types of HDFS objects. "),
@WritesAttribute(attribute="hdfs.count.dirs", description="In case of type='directory' will represent total count of directories under this dir (including itself). " @WritesAttribute(attribute = "hdfs.count.dirs", description = "In case of type='directory' will represent total count of directories under this dir (including itself). "
+ "Won't be populated to other types of HDFS objects. "), + "Won't be populated to other types of HDFS objects. "),
@WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for the file"), @WritesAttribute(attribute = "hdfs.replication", description = "The number of HDFS replicas for the file"),
@WritesAttribute(attribute="hdfs.permissions", description="The permissions for the object in HDFS. This is formatted as 3 characters for the owner, " @WritesAttribute(attribute = "hdfs.permissions", description = "The permissions for the object in HDFS. This is formatted as 3 characters for the owner, "
+ "3 for the group, and 3 for other users. For example rw-rw-r--"), + "3 for the group, and 3 for other users. For example rw-rw-r--"),
@WritesAttribute(attribute="hdfs.status", description="The status contains comma separated list of file/dir paths, which couldn't be listed/accessed. " @WritesAttribute(attribute = "hdfs.status", description = "The status contains comma separated list of file/dir paths, which couldn't be listed/accessed. "
+ "Status won't be set if no errors occured."), + "Status won't be set if no errors occured."),
@WritesAttribute(attribute="hdfs.full.tree", description="When destination is 'attribute', will be populated with full tree of HDFS directory in JSON format." @WritesAttribute(attribute = "hdfs.full.tree", description = "When destination is 'attribute', will be populated with full tree of HDFS directory in JSON format."
+ "WARNING: In case when scan finds thousands or millions of objects, having huge values in attribute could impact flow file repo and GC/heap usage. " + "WARNING: In case when scan finds thousands or millions of objects, having huge values in attribute could impact flow file repo and GC/heap usage. "
+ "Use content destination for such cases") + "Use content destination for such cases")
}) })
@ -228,8 +229,6 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
.description("All failed attempts to access HDFS will be routed to this relationship") .description("All failed attempts to access HDFS will be routed to this relationship")
.build(); .build();
private HDFSFileInfoRequest req;
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
super.init(context); super.init(context);
@ -261,13 +260,6 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
return relationships; return relationships;
} }
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
super.onPropertyModified(descriptor, oldValue, newValue);
// drop request details to rebuild it
req = null;
}
@Override @Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext)); final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
@ -311,20 +303,13 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
ff = session.create(); ff = session.create();
scheduledFF = true; scheduledFF = true;
} }
HDFSFileInfoRequest req = buildRequestDetails(context, ff);
if (req == null) {
//rebuild the request details based on
req = buildRequestDetails(context, session, ff);
}else {
//avoid rebuilding req object's patterns in order to have better performance
req = updateRequestDetails(context, session, ff);
}
try { try {
final FileSystem hdfs = getFileSystem(); final FileSystem hdfs = getFileSystem();
UserGroupInformation ugi = getUserGroupInformation(); UserGroupInformation ugi = getUserGroupInformation();
ExecutionContext executionContext = new ExecutionContext(); ExecutionContext executionContext = new ExecutionContext();
HDFSObjectInfoDetails res = walkHDFSTree(context, session, executionContext, ff, hdfs, ugi, req, null, false); HDFSObjectInfoDetails res = walkHDFSTree(session, executionContext, ff, hdfs, ugi, req, null, false);
executionContext.finish(session); executionContext.finish(session);
if (res == null) { if (res == null) {
ff = session.putAttribute(ff, "hdfs.status", "Path not found: " + req.fullPath); ff = session.putAttribute(ff, "hdfs.status", "Path not found: " + req.fullPath);
@ -333,25 +318,18 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
} }
if (!scheduledFF) { if (!scheduledFF) {
session.transfer(ff, REL_ORIGINAL); session.transfer(ff, REL_ORIGINAL);
}else { } else {
session.remove(ff); session.remove(ff);
} }
} catch (final IOException | IllegalArgumentException e) {
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e});
ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e);
session.transfer(ff, REL_FAILURE);
return;
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
getLogger().error("Interrupted while performing listing of HDFS", e); getLogger().error("Interrupted while performing listing of HDFS", e);
ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e); ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e);
session.transfer(ff, REL_FAILURE); session.transfer(ff, REL_FAILURE);
return;
} catch (final Exception e) { } catch (final Exception e) {
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e}); getLogger().error("Failed to perform listing of HDFS due to {}", new Object[]{e});
ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e); ff = session.putAttribute(ff, "hdfs.status", "Failed due to: " + e);
session.transfer(ff, REL_FAILURE); session.transfer(ff, REL_FAILURE);
return;
} }
} }
@ -359,23 +337,23 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
/* /*
* Walks thru HDFS tree. This method will return null to the main if there is no provided path existing. * Walks thru HDFS tree. This method will return null to the main if there is no provided path existing.
*/ */
protected HDFSObjectInfoDetails walkHDFSTree(final ProcessContext context, final ProcessSession session, ExecutionContext executionContext, protected HDFSObjectInfoDetails walkHDFSTree(final ProcessSession session, ExecutionContext executionContext,
FlowFile origFF, final FileSystem hdfs, final UserGroupInformation ugi, final HDFSFileInfoRequest req, HDFSObjectInfoDetails parent, FlowFile origFF, final FileSystem hdfs, final UserGroupInformation ugi,
final boolean statsOnly final HDFSFileInfoRequest req, HDFSObjectInfoDetails parent, final boolean statsOnly
) throws Exception{ ) throws Exception {
final HDFSObjectInfoDetails p = parent; final HDFSObjectInfoDetails p = parent;
if (!ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.exists(p != null ? p.getPath() : new Path(req.fullPath)))) { if (!ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> hdfs.exists(p != null ? p.getPath() : new Path(req.getFullPath())))) {
return null; return null;
} }
if (parent == null) { if (parent == null) {
parent = new HDFSObjectInfoDetails(ugi.doAs((PrivilegedExceptionAction<FileStatus>) () -> hdfs.getFileStatus(new Path(req.fullPath)))); parent = new HDFSObjectInfoDetails(ugi.doAs((PrivilegedExceptionAction<FileStatus>) () -> hdfs.getFileStatus(new Path(req.getFullPath()))));
} }
if (parent.isFile() && p == null) { if (parent.isFile() && p == null) {
//single file path requested and found, lets send to output: //single file path requested and found, lets send to output:
processHDFSObject(context, session, executionContext, origFF, req, parent, true); processHDFSObject(session, executionContext, origFF, req, parent, true);
return parent; return parent;
} }
@ -384,26 +362,26 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
FileStatus[] listFSt = null; FileStatus[] listFSt = null;
try { try {
listFSt = ugi.doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path)); listFSt = ugi.doAs((PrivilegedExceptionAction<FileStatus[]>) () -> hdfs.listStatus(path));
}catch (IOException e) { } catch (IOException e) {
parent.error = "Couldn't list directory: " + e; parent.error = "Couldn't list directory: " + e;
processHDFSObject(context, session, executionContext, origFF, req, parent, p == null); processHDFSObject(session, executionContext, origFF, req, parent, p == null);
return parent; //File not found exception, or access denied - don't interrupt, just don't list return parent; //File not found exception, or access denied - don't interrupt, just don't list
} }
if (listFSt != null) { if (listFSt != null) {
for (FileStatus f : listFSt) { for (FileStatus f : listFSt) {
HDFSObjectInfoDetails o = new HDFSObjectInfoDetails(f); HDFSObjectInfoDetails o = new HDFSObjectInfoDetails(f);
HDFSObjectInfoDetails vo = validateMatchingPatterns(o, req); HDFSObjectInfoDetails vo = validateMatchingPatterns(o, req);
if (o.isDirectory() && !o.isSymlink() && req.isRecursive) { if (o.isDirectory() && !o.isSymlink() && req.isRecursive()) {
o = walkHDFSTree(context, session, executionContext, origFF, hdfs, ugi, req, o, vo == null || statsOnly); o = walkHDFSTree(session, executionContext, origFF, hdfs, ugi, req, o, vo == null || statsOnly);
parent.countDirs += o.countDirs; parent.countDirs += o.countDirs;
parent.totalLen += o.totalLen; parent.totalLen += o.totalLen;
parent.countFiles += o.countFiles; parent.countFiles += o.countFiles;
}else if (o.isDirectory() && o.isSymlink()) { } else if (o.isDirectory() && o.isSymlink()) {
parent.countDirs += 1; parent.countDirs += 1;
}else if (o.isFile() && !o.isSymlink()) { } else if (o.isFile() && !o.isSymlink()) {
parent.countFiles += 1; parent.countFiles += 1;
parent.totalLen += o.getLen(); parent.totalLen += o.getLen();
}else if (o.isFile() && o.isSymlink()) { } else if (o.isFile() && o.isSymlink()) {
parent.countFiles += 1; // do not add length of the symlink, as it doesn't consume space under THIS directory, but count files, as it is still an object. parent.countFiles += 1; // do not add length of the symlink, as it doesn't consume space under THIS directory, but count files, as it is still an object.
} }
@ -411,14 +389,14 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
if (vo != null && !statsOnly) { if (vo != null && !statsOnly) {
parent.addChild(vo); parent.addChild(vo);
if (vo.isFile() && !vo.isSymlink()) { if (vo.isFile() && !vo.isSymlink()) {
processHDFSObject(context, session, executionContext, origFF, req, vo, false); processHDFSObject(session, executionContext, origFF, req, vo, false);
} }
} }
} }
if (!statsOnly) { if (!statsOnly) {
processHDFSObject(context, session, executionContext, origFF, req, parent, p==null); processHDFSObject(session, executionContext, origFF, req, parent, p == null);
} }
if (req.groupping != Groupping.ALL) { if (req.getGrouping() != ALL) {
parent.setChildren(null); //we need children in full tree only when single output requested. parent.setChildren(null); //we need children in full tree only when single output requested.
} }
} }
@ -432,27 +410,25 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
} }
if (o.isFile()) { if (o.isFile()) {
if (req.isIgnoreDotFiles && o.getPath().getName().startsWith(".")) { if (req.isIgnoreDotFiles() && o.getPath().getName().startsWith(".")) {
return null; return null;
}else if (req.fileExcludeFilter != null && req.fileExcludeFilter.matcher(o.getPath().getName()).matches()) { } else if (req.getFileExcludeFilter() != null && req.getFileExcludeFilter().matcher(o.getPath().getName()).matches()) {
return null; return null;
}else if (req.fileFilter == null) { } else if (req.getFileFilter() != null && req.getFileFilter().matcher(o.getPath().getName()).matches()) {
return o; return o;
}else if (req.fileFilter != null && req.fileFilter.matcher(o.getPath().getName()).matches()) { } else if (req.getFileFilter() == null) {
return o; return o;
}else { }
return null; return null;
} }
}
if (o.isDirectory()) { if (o.isDirectory()) {
if (req.isIgnoreDotDirs && o.getPath().getName().startsWith(".")) { if (req.isIgnoreDotDirs() && o.getPath().getName().startsWith(".")) {
return null; return null;
}else if (req.dirFilter == null) { } else if (req.getDirFilter() != null && req.getDirFilter().matcher(o.getPath().getName()).matches()) {
return o; return o;
}else if (req.dirFilter != null && req.dirFilter.matcher(o.getPath().getName()).matches()) { } else if (req.getDirFilter() == null) {
return o; return o;
}else {
return null;
} }
} }
return null; return null;
@ -462,8 +438,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
* Checks whether HDFS object should be sent to output. * Checks whether HDFS object should be sent to output.
* If it should be sent, new flowfile will be created, its content and attributes will be populated according to other request params. * If it should be sent, new flowfile will be created, its content and attributes will be populated according to other request params.
*/ */
protected HDFSObjectInfoDetails processHDFSObject( protected void processHDFSObject(
final ProcessContext context,
final ProcessSession session, final ProcessSession session,
final ExecutionContext executionContext, final ExecutionContext executionContext,
FlowFile origFF, FlowFile origFF,
@ -471,72 +446,70 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
final HDFSObjectInfoDetails o, final HDFSObjectInfoDetails o,
final boolean isRoot final boolean isRoot
) { ) {
if (o.isFile() && req.groupping != Groupping.NONE) { if (o.isFile() && req.getGrouping() != NONE) {
return null; //there is grouping by either root directory or every directory, no need to print separate files. return;
} }
if (o.isDirectory() && o.isSymlink() && req.groupping != Groupping.NONE) { if (o.isDirectory() && o.isSymlink() && req.getGrouping() != NONE) {
return null; //ignore symlink dirs an return;
} }
if (o.isDirectory() && req.groupping == Groupping.ALL && !isRoot) { if (o.isDirectory() && req.getGrouping() == ALL && !isRoot) {
return null; return;
} }
FlowFile ff = getReadyFlowFile(executionContext, session, origFF); FlowFile ff = getReadyFlowFile(executionContext, session, origFF);
//if destination type is content - always add mime type //if destination type is content - always add mime type
if (req.isDestContent) { if (req.isDestContent()) {
ff = session.putAttribute(ff, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); ff = session.putAttribute(ff, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
} }
//won't combine conditions for similar actions for better readability and maintenance. //won't combine conditions for similar actions for better readability and maintenance.
if (o.isFile() && isRoot && req.isDestContent) { if (o.isFile() && isRoot && req.isDestContent()) {
ff = addAsContent(executionContext, session, o, ff); ff = addAsContent(executionContext, session, o, ff);
// ------------------------------ // ------------------------------
}else if (o.isFile() && isRoot && !req.isDestContent) { } else if (o.isFile() && isRoot && !req.isDestContent()) {
ff = addAsAttributes(session, o, ff); ff = addAsAttributes(session, o, ff);
// ------------------------------ // ------------------------------
}else if (o.isFile() && req.isDestContent) { } else if (o.isFile() && req.isDestContent()) {
ff = addAsContent(executionContext, session, o, ff); ff = addAsContent(executionContext, session, o, ff);
// ------------------------------ // ------------------------------
}else if (o.isFile() && !req.isDestContent) { } else if (o.isFile() && !req.isDestContent()) {
ff = addAsAttributes(session, o, ff); ff = addAsAttributes(session, o, ff);
// ------------------------------ // ------------------------------
}else if (o.isDirectory() && o.isSymlink() && req.isDestContent) { } else if (o.isDirectory() && o.isSymlink() && req.isDestContent()) {
ff = addAsContent(executionContext, session, o, ff); ff = addAsContent(executionContext, session, o, ff);
// ------------------------------ // ------------------------------
}else if (o.isDirectory() && o.isSymlink() && !req.isDestContent) { } else if (o.isDirectory() && o.isSymlink() && !req.isDestContent()) {
ff = addAsAttributes(session, o, ff); ff = addAsAttributes(session, o, ff);
// ------------------------------ // ------------------------------
}else if (o.isDirectory() && req.groupping == Groupping.NONE && req.isDestContent) { } else if (o.isDirectory() && req.getGrouping() == NONE && req.isDestContent()) {
o.setChildren(null); o.setChildren(null);
ff = addAsContent(executionContext, session, o, ff); ff = addAsContent(executionContext, session, o, ff);
// ------------------------------ // ------------------------------
}else if (o.isDirectory() && req.groupping == Groupping.NONE && !req.isDestContent) { } else if (o.isDirectory() && req.getGrouping() == NONE && !req.isDestContent()) {
ff = addAsAttributes(session, o, ff); ff = addAsAttributes(session, o, ff);
// ------------------------------ // ------------------------------
}else if (o.isDirectory() && req.groupping == Groupping.DIR && req.isDestContent) { } else if (o.isDirectory() && req.getGrouping() == DIR && req.isDestContent()) {
ff = addAsContent(executionContext, session, o, ff); ff = addAsContent(executionContext, session, o, ff);
// ------------------------------ // ------------------------------
}else if (o.isDirectory() && req.groupping == Groupping.DIR && !req.isDestContent) { } else if (o.isDirectory() && req.getGrouping() == DIR && !req.isDestContent()) {
ff = addAsAttributes(session, o, ff); ff = addAsAttributes(session, o, ff);
ff = addFullTreeToAttribute(session, o, ff); ff = addFullTreeToAttribute(session, o, ff);
// ------------------------------ // ------------------------------
}else if (o.isDirectory() && req.groupping == Groupping.ALL && req.isDestContent) { } else if (o.isDirectory() && req.getGrouping() == ALL && req.isDestContent()) {
ff = addAsContent(executionContext, session, o, ff); ff = addAsContent(executionContext, session, o, ff);
// ------------------------------ // ------------------------------
}else if (o.isDirectory() && req.groupping == Groupping.ALL && !req.isDestContent) { } else if (o.isDirectory() && req.getGrouping() == ALL && !req.isDestContent()) {
ff = addAsAttributes(session, o, ff); ff = addAsAttributes(session, o, ff);
ff = addFullTreeToAttribute(session, o, ff); ff = addFullTreeToAttribute(session, o, ff);
}else { } else {
getLogger().error("Illegal State!"); getLogger().error("Illegal State!");
session.remove(ff); session.remove(ff);
return null; return;
} }
executionContext.flowfile = ff; executionContext.flowfile = ff;
finishProcessing(executionContext, session); finishProcessing(req, executionContext, session);
return o;
} }
private FlowFile getReadyFlowFile(ExecutionContext executionContext, ProcessSession session, FlowFile origFF) { private FlowFile getReadyFlowFile(ExecutionContext executionContext, ProcessSession session, FlowFile origFF) {
@ -547,13 +520,12 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
return executionContext.flowfile; return executionContext.flowfile;
} }
private void finishProcessing(ExecutionContext executionContext, ProcessSession session) { private void finishProcessing(HDFSFileInfoRequest req, ExecutionContext executionContext, ProcessSession session) {
executionContext.nrOfWaitingHDFSObjects++; executionContext.nrOfWaitingHDFSObjects++;
if (req.groupping == Groupping.NONE && req.isDestContent && executionContext.nrOfWaitingHDFSObjects < req.batchSize) { if (req.grouping == NONE && req.isDestContent() && executionContext.nrOfWaitingHDFSObjects < req.getBatchSize()) {
return; return;
} }
session.transfer(executionContext.flowfile, REL_SUCCESS); session.transfer(executionContext.flowfile, REL_SUCCESS);
executionContext.reset(); executionContext.reset();
@ -608,81 +580,39 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
* Creates internal request object and initialize the fields that won't be changed every call (onTrigger). * Creates internal request object and initialize the fields that won't be changed every call (onTrigger).
* Dynamic fields will be updated per each call separately. * Dynamic fields will be updated per each call separately.
*/ */
protected HDFSFileInfoRequest buildRequestDetails(ProcessContext context, ProcessSession session, FlowFile ff) { protected HDFSFileInfoRequest buildRequestDetails(ProcessContext context, FlowFile ff) {
HDFSFileInfoRequest req = new HDFSFileInfoRequest(); HDFSFileInfoRequest req = new HDFSFileInfoRequest();
req.fullPath = context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue(); req.setFullPath(context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue());
req.isRecursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); req.setRecursive(context.getProperty(RECURSE_SUBDIRS).asBoolean());
PropertyValue pv = null; PropertyValue pv;
String v = null; String v;
if (context.getProperty(DIR_FILTER).isSet() && (pv=context.getProperty(DIR_FILTER).evaluateAttributeExpressions(ff))!=null) { if (context.getProperty(DIR_FILTER).isSet() && (pv = context.getProperty(DIR_FILTER).evaluateAttributeExpressions(ff)) != null) {
v = pv.getValue(); v = pv.getValue();
req.dirFilter = v == null ? null : Pattern.compile(v); req.setDirFilter(v == null ? null : Pattern.compile(v));
} }
if (context.getProperty(FILE_FILTER).isSet() && (pv=context.getProperty(FILE_FILTER).evaluateAttributeExpressions(ff))!=null) { if (context.getProperty(FILE_FILTER).isSet() && (pv = context.getProperty(FILE_FILTER).evaluateAttributeExpressions(ff)) != null) {
v = pv.getValue(); v = pv.getValue();
req.fileFilter = v == null ? null : Pattern.compile(v); req.setFileFilter(v == null ? null : Pattern.compile(v));
} }
if (context.getProperty(FILE_EXCLUDE_FILTER).isSet() && (pv=context.getProperty(FILE_EXCLUDE_FILTER).evaluateAttributeExpressions(ff))!=null) { if (context.getProperty(FILE_EXCLUDE_FILTER).isSet()
&& (pv = context.getProperty(FILE_EXCLUDE_FILTER).evaluateAttributeExpressions(ff)) != null) {
v = pv.getValue(); v = pv.getValue();
req.fileExcludeFilter = v == null ? null : Pattern.compile(v); req.setFileExcludeFilter(v == null ? null : Pattern.compile(v));
} }
req.isIgnoreDotFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean(); req.setIgnoreDotFiles(context.getProperty(IGNORE_DOTTED_FILES).asBoolean());
req.isIgnoreDotDirs = context.getProperty(IGNORE_DOTTED_DIRS).asBoolean(); req.setIgnoreDotDirs(context.getProperty(IGNORE_DOTTED_DIRS).asBoolean());
req.groupping = HDFSFileInfoRequest.Groupping.getEnum(context.getProperty(GROUPING).getValue()); req.setGrouping(HDFSFileInfoRequest.Grouping.getEnum(context.getProperty(GROUPING).getValue()));
req.batchSize = Optional.ofNullable(context.getProperty(BATCH_SIZE)) req.setBatchSize(context.getProperty(BATCH_SIZE).asInteger() != null ? context.getProperty(BATCH_SIZE).asInteger() : 1);
.filter(propertyValue -> propertyValue.getValue() != null)
.map(PropertyValue::asInteger)
.orElse(1);
v = context.getProperty(DESTINATION).getValue(); v = context.getProperty(DESTINATION).getValue();
if (DESTINATION_CONTENT.getValue().equals(v)) {
req.isDestContent = true;
}else {
req.isDestContent = false;
}
return req; req.setDestContent(DESTINATION_CONTENT.getValue().equals(v));
}
/*
* Creates internal request object if not created previously, and updates it with dynamic property every time onTrigger is called.
* Avoids creating regex Patter objects unless their actual value are changed due to evaluation of EL
*/
protected HDFSFileInfoRequest updateRequestDetails(ProcessContext context, ProcessSession session, FlowFile ff) {
if (req == null) {
return buildRequestDetails(context, session, ff);
}
req.fullPath = context.getProperty(FULL_PATH).evaluateAttributeExpressions(ff).getValue();
String currValue = null;
String oldValue = null;
currValue = context.getProperty(DIR_FILTER).evaluateAttributeExpressions(ff).getValue();
oldValue = req.dirFilter == null ? null : req.dirFilter.toString();
if (StringUtils.compare(currValue, oldValue) != 0) {
req.dirFilter = currValue == null ? null : Pattern.compile(currValue);
}
currValue = context.getProperty(FILE_FILTER).evaluateAttributeExpressions(ff).getValue();
oldValue = req.fileFilter == null ? null : req.fileFilter.toString();
if (StringUtils.compare(currValue, oldValue) != 0) {
req.fileFilter = currValue == null ? null : Pattern.compile(currValue);
}
currValue = context.getProperty(FILE_EXCLUDE_FILTER).evaluateAttributeExpressions(ff).getValue();
oldValue = req.fileExcludeFilter == null ? null : req.fileExcludeFilter.toString();
if (StringUtils.compare(currValue, oldValue) != 0) {
req.fileExcludeFilter = currValue == null ? null : Pattern.compile(currValue);
}
return req; return req;
} }
@ -707,15 +637,15 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
/* /*
* Keeps all request details in single object. * Keeps all request details in single object.
*/ */
static class HDFSFileInfoRequest{ static class HDFSFileInfoRequest {
enum Groupping { enum Grouping {
ALL(GROUP_ALL.getValue()), ALL(GROUP_ALL.getValue()),
DIR(GROUP_PARENT_DIR.getValue()), DIR(GROUP_PARENT_DIR.getValue()),
NONE(GROUP_NONE.getValue()); NONE(GROUP_NONE.getValue());
private String val; final private String val;
Groupping(String val){ Grouping(String val) {
this.val = val; this.val = val;
} }
@ -723,8 +653,8 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
return this.val; return this.val;
} }
public static Groupping getEnum(String value) { public static Grouping getEnum(String value) {
for (Groupping v : values()) { for (Grouping v : values()) {
if (v.val.equals(value)) { if (v.val.equals(value)) {
return v; return v;
} }
@ -733,23 +663,103 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
} }
} }
String fullPath; private String fullPath;
boolean isRecursive; private boolean recursive;
Pattern dirFilter; private Pattern dirFilter;
Pattern fileFilter; private Pattern fileFilter;
Pattern fileExcludeFilter; private Pattern fileExcludeFilter;
boolean isIgnoreDotFiles; private boolean ignoreDotFiles;
boolean isIgnoreDotDirs; private boolean ignoreDotDirs;
boolean isDestContent; private boolean destContent;
Groupping groupping; private Grouping grouping;
int batchSize; private int batchSize;
String getFullPath() {
return fullPath;
}
void setFullPath(String fullPath) {
this.fullPath = fullPath;
}
boolean isRecursive() {
return this.recursive;
}
void setRecursive(boolean recursive) {
this.recursive = recursive;
}
Pattern getDirFilter() {
return this.dirFilter;
}
void setDirFilter(Pattern dirFilter) {
this.dirFilter = dirFilter;
}
Pattern getFileFilter() {
return fileFilter;
}
void setFileFilter(Pattern fileFilter) {
this.fileFilter = fileFilter;
}
Pattern getFileExcludeFilter() {
return fileExcludeFilter;
}
void setFileExcludeFilter(Pattern fileExcludeFilter) {
this.fileExcludeFilter = fileExcludeFilter;
}
boolean isIgnoreDotFiles() {
return ignoreDotFiles;
}
void setIgnoreDotFiles(boolean ignoreDotFiles) {
this.ignoreDotFiles = ignoreDotFiles;
}
boolean isIgnoreDotDirs() {
return this.ignoreDotDirs;
}
void setIgnoreDotDirs(boolean ignoreDotDirs) {
this.ignoreDotDirs = ignoreDotDirs;
}
boolean isDestContent() {
return this.destContent;
}
void setDestContent(boolean destContent) {
this.destContent = destContent;
}
Grouping getGrouping() {
return grouping;
}
void setGrouping(Grouping grouping) {
this.grouping = grouping;
}
int getBatchSize() {
return this.batchSize;
}
void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
} }
/* /*
* Keeps details of HDFS objects. * Keeps details of HDFS objects.
* This class is based on FileStatus and adds additional feature/properties for count, total size of directories, and subtrees/hierarchy of recursive listings. * This class is based on FileStatus and adds additional feature/properties for count, total size of directories, and subtrees/hierarchy of recursive listings.
*/ */
class HDFSObjectInfoDetails extends FileStatus{ class HDFSObjectInfoDetails extends FileStatus {
private long countFiles; private long countFiles;
private long countDirs = 1; private long countDirs = 1;
@ -757,7 +767,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
private Collection<HDFSObjectInfoDetails> children = new LinkedList<>(); private Collection<HDFSObjectInfoDetails> children = new LinkedList<>();
private String error; private String error;
HDFSObjectInfoDetails(FileStatus fs) throws IOException{ HDFSObjectInfoDetails(FileStatus fs) throws IOException {
super(fs); super(fs);
} }
@ -812,17 +822,17 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
this.totalLen = 0; this.totalLen = 0;
} }
for(HDFSObjectInfoDetails c : children) { for (HDFSObjectInfoDetails c : children) {
if (c.isSymlink()) { if (c.isSymlink()) {
continue; //do not count symlinks. they either will be counted under their actual directories, or won't be count if actual location is not under provided root for scan. continue; //do not count symlinks. they either will be counted under their actual directories, or won't be count if actual location is not under provided root for scan.
}else if (c.isDirectory()) { } else if (c.isDirectory()) {
if (deepUpdate) { if (deepUpdate) {
c.updateTotals(deepUpdate); c.updateTotals(deepUpdate);
} }
this.totalLen += c.totalLen; this.totalLen += c.totalLen;
this.countDirs += c.countDirs; this.countDirs += c.countDirs;
this.countFiles += c.countFiles; this.countFiles += c.countFiles;
}else if (c.isFile()) { } else if (c.isFile()) {
this.totalLen += c.getLen(); this.totalLen += c.getLen();
this.countFiles++; this.countFiles++;
} }
@ -833,7 +843,7 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
/* /*
* Since, by definition, FF will keep only attributes for parent/single object, we don't need to recurse the children * Since, by definition, FF will keep only attributes for parent/single object, we don't need to recurse the children
*/ */
public Map<String, String> toAttributesMap(){ public Map<String, String> toAttributesMap() {
Map<String, String> map = new HashMap<>(); Map<String, String> map = new HashMap<>();
map.put("hdfs.objectName", this.getPath().getName()); map.put("hdfs.objectName", this.getPath().getName());
@ -892,14 +902,14 @@ public class GetHDFSFileInfo extends AbstractHadoopProcessor {
for (HDFSObjectInfoDetails c : this.getChildren()) { for (HDFSObjectInfoDetails c : this.getChildren()) {
c.toJsonString(sb).append(","); c.toJsonString(sb).append(",");
} }
sb.deleteCharAt(sb.length()-1).append("]"); sb.deleteCharAt(sb.length() - 1).append("]");
} }
sb.append("}"); sb.append("}");
return sb; return sb;
} }
private StringBuilder appendProperty(StringBuilder sb, String name, String value) { private StringBuilder appendProperty(StringBuilder sb, String name, String value) {
return sb.append("\"").append(name).append("\":\"").append(value == null? "": value).append("\""); return sb.append("\"").append(name).append("\":\"").append(value == null ? "" : value).append("\"");
} }
} }
} }