NIFI-5152: MoveHDFS now works even with no upstream connection

This closes #2681.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
zenfenan 2018-05-06 09:46:19 +05:30 committed by Bryan Bende
parent 868808f4b4
commit 4544f3969d
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
1 changed files with 17 additions and 18 deletions

View File

@ -24,6 +24,8 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction; import org.apache.nifi.annotation.behavior.Restriction;
@ -65,6 +67,7 @@ import java.util.regex.Pattern;
/** /**
* This processor renames files on HDFS. * This processor renames files on HDFS.
*/ */
@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"hadoop", "HDFS", "put", "move", "filesystem", "moveHDFS"}) @Tags({"hadoop", "HDFS", "put", "move", "filesystem", "moveHDFS"})
@CapabilityDescription("Rename existing files or a directory of files (non-recursive) on Hadoop Distributed File System (HDFS).") @CapabilityDescription("Rename existing files or a directory of files (non-recursive) on Hadoop Distributed File System (HDFS).")
@ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.") @ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.")
@ -139,6 +142,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
.name("Input Directory or File") .name("Input Directory or File")
.description("The HDFS directory from which files should be read, or a single file to read.") .description("The HDFS directory from which files should be read, or a single file to read.")
.defaultValue("${path}") .defaultValue("${path}")
.required(true)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build(); .build();
@ -228,14 +232,16 @@ public class MoveHDFS extends AbstractHadoopProcessor {
@Override @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
// MoveHDFS FlowFile flowFile = session.get();
FlowFile parentFlowFile = session.get();
if (parentFlowFile == null) { if (flowFile == null && context.hasIncomingConnection()) {
return; return;
} }
flowFile = (flowFile != null) ? flowFile : session.create();
final FileSystem hdfs = getFileSystem(); final FileSystem hdfs = getFileSystem();
final String filenameValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).evaluateAttributeExpressions(parentFlowFile).getValue(); final String filenameValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).evaluateAttributeExpressions(flowFile).getValue();
Path inputPath = null; Path inputPath = null;
try { try {
@ -244,10 +250,10 @@ public class MoveHDFS extends AbstractHadoopProcessor {
throw new IOException("Input Directory or File does not exist in HDFS"); throw new IOException("Input Directory or File does not exist in HDFS");
} }
} catch (Exception e) { } catch (Exception e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{filenameValue, parentFlowFile, e}); getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{filenameValue, flowFile, e});
parentFlowFile = session.putAttribute(parentFlowFile, "hdfs.failure.reason", e.getMessage()); flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
parentFlowFile = session.penalize(parentFlowFile); flowFile = session.penalize(flowFile);
session.transfer(parentFlowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
return; return;
} }
@ -298,7 +304,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
filePathQueue.drainTo(files); filePathQueue.drainTo(files);
if (files.isEmpty()) { if (files.isEmpty()) {
// nothing to do! // nothing to do!
session.remove(parentFlowFile); session.remove(flowFile);
context.yield(); context.yield();
return; return;
} }
@ -306,7 +312,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
queueLock.unlock(); queueLock.unlock();
} }
processBatchOfFiles(files, context, session, parentFlowFile); processBatchOfFiles(files, context, session, flowFile);
queueLock.lock(); queueLock.lock();
try { try {
@ -315,7 +321,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
queueLock.unlock(); queueLock.unlock();
} }
session.remove(parentFlowFile); session.remove(flowFile);
} }
protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, protected void processBatchOfFiles(final List<Path> files, final ProcessContext context,
@ -496,7 +502,6 @@ public class MoveHDFS extends AbstractHadoopProcessor {
final private String conflictResolution; final private String conflictResolution;
final private String operation; final private String operation;
final private Path inputRootDirPath;
final private Path outputRootDirPath; final private Path outputRootDirPath;
final private Pattern fileFilterPattern; final private Pattern fileFilterPattern;
final private boolean ignoreDottedFiles; final private boolean ignoreDottedFiles;
@ -504,8 +509,6 @@ public class MoveHDFS extends AbstractHadoopProcessor {
ProcessorConfiguration(final ProcessContext context) { ProcessorConfiguration(final ProcessContext context) {
conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue(); conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
operation = context.getProperty(OPERATION).getValue(); operation = context.getProperty(OPERATION).getValue();
final String inputDirValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).evaluateAttributeExpressions().getValue();
inputRootDirPath = new Path(inputDirValue);
final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions().getValue(); final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions().getValue();
outputRootDirPath = new Path(outputDirValue); outputRootDirPath = new Path(outputDirValue);
final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue(); final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
@ -521,10 +524,6 @@ public class MoveHDFS extends AbstractHadoopProcessor {
return conflictResolution; return conflictResolution;
} }
public Path getInput() {
return inputRootDirPath;
}
public Path getOutputDirectory() { public Path getOutputDirectory() {
return outputRootDirPath; return outputRootDirPath;
} }