diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java index a2292f541e..95bcdeb11c 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java @@ -152,7 +152,7 @@ public class MoveHDFS extends AbstractHadoopProcessor { .description("The HDFS directory where the files will be moved to") .required(true) .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); public static final PropertyDescriptor OPERATION = new PropertyDescriptor.Builder() @@ -243,7 +243,7 @@ public class MoveHDFS extends AbstractHadoopProcessor { final FileSystem hdfs = getFileSystem(); final String filenameValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).evaluateAttributeExpressions(flowFile).getValue(); - Path inputPath = null; + Path inputPath; try { inputPath = new Path(filenameValue); if (!hdfs.exists(inputPath)) { @@ -257,7 +257,7 @@ public class MoveHDFS extends AbstractHadoopProcessor { return; } - List files = new ArrayList(); + List files = new ArrayList<>(); try { final StopWatch stopWatch = new StopWatch(true); @@ -348,7 +348,8 @@ public class MoveHDFS extends AbstractHadoopProcessor { FlowFile flowFile = session.create(parentFlowFile); try { final String originalFilename = file.getName(); - final Path configuredRootOutputDirPath = processorConfig.getOutputDirectory(); + final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions(parentFlowFile).getValue(); + final Path configuredRootOutputDirPath = new Path(outputDirValue); final Path newFile = new Path(configuredRootOutputDirPath, originalFilename); final boolean destinationExists = hdfs.exists(newFile); // If destination file already exists, resolve that @@ -502,15 +503,12 @@ public class MoveHDFS extends AbstractHadoopProcessor { final private String conflictResolution; final private String operation; - final private Path outputRootDirPath; final private Pattern fileFilterPattern; final private boolean ignoreDottedFiles; ProcessorConfiguration(final ProcessContext context) { conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue(); operation = context.getProperty(OPERATION).getValue(); - final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions().getValue(); - outputRootDirPath = new Path(outputDirValue); final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue(); fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex); ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean(); @@ -524,10 +522,6 @@ public class MoveHDFS extends AbstractHadoopProcessor { return conflictResolution; } - public Path getOutputDirectory() { - return outputRootDirPath; - } - protected PathFilter getPathFilter(final Path dir) { return new PathFilter() {