NIFI-6049: Updated MoveHDFS to support FF Attribute for Output Directory (#3433)

NIFI-6049: Updated MoveHDFS to support FF Attribute for Output Directory
This commit is contained in:
Sivaprasanna 2019-04-22 21:42:16 +05:30 committed by Peter Wicks
parent 161e4b5763
commit 94c2b1e76e
1 changed files with 5 additions and 11 deletions

View File

@ -152,7 +152,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
.description("The HDFS directory where the files will be moved to") .description("The HDFS directory where the files will be moved to")
.required(true) .required(true)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build(); .build();
public static final PropertyDescriptor OPERATION = new PropertyDescriptor.Builder() public static final PropertyDescriptor OPERATION = new PropertyDescriptor.Builder()
@ -243,7 +243,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
final FileSystem hdfs = getFileSystem(); final FileSystem hdfs = getFileSystem();
final String filenameValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).evaluateAttributeExpressions(flowFile).getValue(); final String filenameValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).evaluateAttributeExpressions(flowFile).getValue();
Path inputPath = null; Path inputPath;
try { try {
inputPath = new Path(filenameValue); inputPath = new Path(filenameValue);
if (!hdfs.exists(inputPath)) { if (!hdfs.exists(inputPath)) {
@ -257,7 +257,7 @@ public class MoveHDFS extends AbstractHadoopProcessor {
return; return;
} }
List<Path> files = new ArrayList<Path>(); List<Path> files = new ArrayList<>();
try { try {
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
@ -348,7 +348,8 @@ public class MoveHDFS extends AbstractHadoopProcessor {
FlowFile flowFile = session.create(parentFlowFile); FlowFile flowFile = session.create(parentFlowFile);
try { try {
final String originalFilename = file.getName(); final String originalFilename = file.getName();
final Path configuredRootOutputDirPath = processorConfig.getOutputDirectory(); final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions(parentFlowFile).getValue();
final Path configuredRootOutputDirPath = new Path(outputDirValue);
final Path newFile = new Path(configuredRootOutputDirPath, originalFilename); final Path newFile = new Path(configuredRootOutputDirPath, originalFilename);
final boolean destinationExists = hdfs.exists(newFile); final boolean destinationExists = hdfs.exists(newFile);
// If destination file already exists, resolve that // If destination file already exists, resolve that
@ -502,15 +503,12 @@ public class MoveHDFS extends AbstractHadoopProcessor {
final private String conflictResolution; final private String conflictResolution;
final private String operation; final private String operation;
final private Path outputRootDirPath;
final private Pattern fileFilterPattern; final private Pattern fileFilterPattern;
final private boolean ignoreDottedFiles; final private boolean ignoreDottedFiles;
ProcessorConfiguration(final ProcessContext context) { ProcessorConfiguration(final ProcessContext context) {
conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue(); conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
operation = context.getProperty(OPERATION).getValue(); operation = context.getProperty(OPERATION).getValue();
final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions().getValue();
outputRootDirPath = new Path(outputDirValue);
final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue(); final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex); fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);
ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean(); ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
@ -524,10 +522,6 @@ public class MoveHDFS extends AbstractHadoopProcessor {
return conflictResolution; return conflictResolution;
} }
public Path getOutputDirectory() {
return outputRootDirPath;
}
protected PathFilter getPathFilter(final Path dir) { protected PathFilter getPathFilter(final Path dir) {
return new PathFilter() { return new PathFilter() {