diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java index 47d5d5034e..5ceb952fa8 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java @@ -24,6 +24,7 @@ import java.nio.charset.UnsupportedCharsetException; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; @@ -279,15 +280,17 @@ public class StandardValidators { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { - return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); + try { + final String result = context.newExpressionLanguageCompiler().validateExpression(input, true); + if (!StringUtils.isEmpty(result)) { + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(result).build(); + } + } catch (final Exception e) { + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(e.getMessage()).build(); + } } - try { - context.newExpressionLanguageCompiler().compile(input); - return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); - } catch (final Exception e) { - return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(e.getMessage()).build(); - } + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); } }; diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 540c406bfd..93e0703cb8 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -91,19 +91,35 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } // properties - public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder().name("Hadoop Configuration Resources") + public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() + .name("Hadoop Configuration Resources") .description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop " + "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.") - .required(false).addValidator(createMultipleFilesExistValidator()).build(); + .required(false) + .addValidator(createMultipleFilesExistValidator()) + .build(); - public static final String DIRECTORY_PROP_NAME = "Directory"; + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .name("Directory") + .description("The HDFS directory from which files should be read") + .required(true) + .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) + .expressionLanguageSupported(true) + .build(); - public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("Compression codec").required(true) - .allowableValues(CompressionType.values()).defaultValue(CompressionType.NONE.toString()).build(); + public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name("Compression codec") + .required(true) + .allowableValues(CompressionType.values()) + .defaultValue(CompressionType.NONE.toString()) + .build(); - public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false) - .description("Period of time which should pass before attempting a kerberos relogin").defaultValue("4 hours") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder() + .name("Kerberos Relogin Period").required(false) + .description("Period of time which should pass before attempting a kerberos relogin") + .defaultValue("4 hours") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); private static final Object RESOURCES_LOCK = new Object(); @@ -191,19 +207,8 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } HdfsResources resources = hdfsResources.get(); if (resources.getConfiguration() == null) { - String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); - final String dir; - final PropertyDescriptor directoryPropDescriptor = getPropertyDescriptor(DIRECTORY_PROP_NAME); - if (directoryPropDescriptor != null) { - if (directoryPropDescriptor.isExpressionLanguageSupported()) { - dir = context.getProperty(DIRECTORY_PROP_NAME).evaluateAttributeExpressions().getValue(); - } else { - dir = context.getProperty(DIRECTORY_PROP_NAME).getValue(); - } - } else { - dir = null; - } - resources = resetHDFSResources(configResources, dir == null ? "/" : dir, context); + final String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); + resources = resetHDFSResources(configResources, context); hdfsResources.set(resources); } } catch (IOException ex) { @@ -249,7 +254,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { /* * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources. */ - HdfsResources resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException { + HdfsResources resetHDFSResources(String configResources, ProcessContext context) throws IOException { // org.apache.hadoop.conf.Configuration saves its current thread context class loader to use for threads that it creates // later to do I/O. We need this class loader to be the NarClassLoader instead of the magical // NarThreadContextClassLoader. @@ -286,8 +291,10 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { } } + final Path workingDir = fs.getWorkingDirectory(); getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", - new Object[] { fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), fs.getDefaultReplication(new Path(dir)), config.toString() }); + new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()}); + return new HdfsResources(config, fs, ugi); } finally { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index fdb2dcffdb..3b1cce2646 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -54,13 +54,14 @@ import java.util.concurrent.TimeUnit; + "not be fetched from HDFS") @SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class}) public class FetchHDFS extends AbstractHadoopProcessor { + static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder() .name("HDFS Filename") .description("The name of the HDFS file to retrieve") .required(true) .expressionLanguageSupported(true) .defaultValue("${path}/${filename}") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -102,9 +103,20 @@ public class FetchHDFS extends AbstractHadoopProcessor { } final FileSystem hdfs = getFileSystem(); - final Path path = new Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue()); - final URI uri = path.toUri(); + final String filenameValue = context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue(); + Path path = null; + try { + path = new Path(filenameValue); + } catch (IllegalArgumentException e) { + getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, flowFile, e}); + flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage()); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final URI uri = path.toUri(); final StopWatch stopWatch = new StopWatch(true); try (final FSDataInputStream inStream = hdfs.open(path, 16384)) { flowFile = session.importFrom(inStream, flowFile); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index 2631840d2b..7ab7ebed12 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -86,14 +86,6 @@ public class GetHDFS extends AbstractHadoopProcessor { .build(); // properties - public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() - .name(DIRECTORY_PROP_NAME) - .description("The HDFS directory from which files should be read") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() .name("Recurse Subdirectories") .description("Indicates whether to pull files from subdirectories of the HDFS directory") @@ -224,6 +216,16 @@ public class GetHDFS extends AbstractHadoopProcessor { .explanation(MIN_AGE.getName() + " cannot be greater than " + MAX_AGE.getName()).build()); } + try { + new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()); + } catch (Exception e) { + problems.add(new ValidationResult.Builder() + .valid(false) + .subject("Directory") + .explanation(e.getMessage()) + .build()); + } + return problems; } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index 6d9f8f7503..2ae65b26cf 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -42,7 +42,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.hadoop.util.HDFSListing; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.JsonParseException; @@ -89,6 +88,7 @@ import java.util.concurrent.TimeUnit; + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.") @SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class}) public class ListHDFS extends AbstractHadoopProcessor { + public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() .name("Distributed Cache Service") .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node " @@ -97,14 +97,6 @@ public class ListHDFS extends AbstractHadoopProcessor { .identifiesControllerService(DistributedMapCacheClient.class) .build(); - public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() - .name(DIRECTORY_PROP_NAME) - .description("The HDFS directory from which files should be read") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() .name("Recurse Subdirectories") .description("Indicates whether to list files from subdirectories of the HDFS directory") @@ -287,14 +279,14 @@ public class ListHDFS extends AbstractHadoopProcessor { // Pull in any file that is newer than the timestamp that we have. final FileSystem hdfs = getFileSystem(); final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); - final Path rootPath = new Path(directory); final Set statuses; try { + final Path rootPath = new Path(directory); statuses = getStatuses(rootPath, recursive, hdfs); getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()}); - } catch (final IOException ioe) { - getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe}); + } catch (final IOException | IllegalArgumentException e) { + getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e}); return; } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index f05c2c7458..3a0cb48a6a 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -96,13 +96,6 @@ public class PutHDFS extends AbstractHadoopProcessor { .build(); // properties - public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() - .name(DIRECTORY_PROP_NAME) - .description("The parent HDFS directory to which files should be written") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() .name("Conflict Resolution Strategy") @@ -168,7 +161,10 @@ public class PutHDFS extends AbstractHadoopProcessor { @Override protected List getSupportedPropertyDescriptors() { List props = new ArrayList<>(properties); - props.add(DIRECTORY); + props.add(new PropertyDescriptor.Builder() + .fromPropertyDescriptor(DIRECTORY) + .description("The parent HDFS directory to which files should be written") + .build()); props.add(CONFLICT_RESOLUTION); props.add(BLOCK_SIZE); props.add(BUFFER_SIZE); @@ -212,27 +208,29 @@ public class PutHDFS extends AbstractHadoopProcessor { return; } - final Path configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue()); - final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); - - final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B); - final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath); - - final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); - final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT); - - final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger(); - final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs - .getDefaultReplication(configuredRootDirPath); - - final CompressionCodec codec = getCompressionCodec(context, configuration); - - final String filename = codec != null - ? flowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension() - : flowFile.getAttribute(CoreAttributes.FILENAME.key()); - Path tempDotCopyFile = null; try { + final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); + final Path configuredRootDirPath = new Path(dirValue); + + final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); + + final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B); + final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath); + + final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B); + final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT); + + final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger(); + final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs + .getDefaultReplication(configuredRootDirPath); + + final CompressionCodec codec = getCompressionCodec(context, configuration); + + final String filename = codec != null + ? flowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension() + : flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename); final Path copyFile = new Path(configuredRootDirPath, filename); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java index 76fc15dd67..9e2193d201 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/AbstractHadoopTest.java @@ -112,7 +112,7 @@ public class AbstractHadoopTest { SimpleHadoopProcessor processor = new SimpleHadoopProcessor(kerberosProperties); TestRunner runner = TestRunners.newTestRunner(processor); try { - processor.resetHDFSResources("src/test/resources/core-site-broken.xml", "/target", runner.getProcessContext()); + processor.resetHDFSResources("src/test/resources/core-site-broken.xml", runner.getProcessContext()); Assert.fail("Should have thrown SocketTimeoutException"); } catch (IOException e) { } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java index 79f7f54d0e..691619543d 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSSequenceFileTest.java @@ -103,7 +103,7 @@ public class GetHDFSSequenceFileTest { public class TestableGetHDFSSequenceFile extends GetHDFSSequenceFile { @Override - HdfsResources resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException { + HdfsResources resetHDFSResources(String configResources, ProcessContext context) throws IOException { return hdfsResources; } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java index 64fe16fce5..582346a985 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java @@ -196,6 +196,47 @@ public class GetHDFSTest { flowFile.assertContentEquals(expected); } + @Test + public void testDirectoryUsesValidEL() throws IOException { + GetHDFS proc = new TestableGetHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/${literal('testdata'):substring(0,8)}"); + runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip"); + runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); + runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC"); + runner.run(); + + List flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + + MockFlowFile flowFile = flowFiles.get(0); + assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip")); + InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip"); + flowFile.assertContentEquals(expected); + } + + @Test + public void testDirectoryUsesUnrecognizedEL() throws IOException { + GetHDFS proc = new TestableGetHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutHDFS.DIRECTORY, "data_${literal('testing'):substring(0,4)%7D"); + runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip"); + runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); + runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC"); + runner.assertNotValid(); + } + + @Test + public void testDirectoryUsesInvalidEL() throws IOException { + GetHDFS proc = new TestableGetHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutHDFS.DIRECTORY, "data_${literal('testing'):foo()}"); + runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip"); + runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); + runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC"); + runner.assertNotValid(); + } + private static class TestableGetHDFS extends GetHDFS { private final KerberosProperties testKerberosProperties; diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index 34efcb2571..c8f8fb102e 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -295,6 +295,73 @@ public class PutHDFSTest { fs.delete(p, true); } + @Test + public void testPutFileWhenDirectoryUsesValidELFunction() throws IOException { + // Refer to comment in the BeforeClass method for an explanation + assumeTrue(isNotWindows()); + + PutHDFS proc = new TestablePutHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutHDFS.DIRECTORY, "target/data_${literal('testing'):substring(0,4)}"); + runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { + Map attributes = new HashMap(); + attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); + runner.enqueue(fis, attributes); + runner.run(); + } + + Configuration config = new Configuration(); + FileSystem fs = FileSystem.get(config); + + List failedFlowFiles = runner + .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build()); + assertTrue(failedFlowFiles.isEmpty()); + + List flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + MockFlowFile flowFile = flowFiles.get(0); + assertTrue(fs.exists(new Path("target/test-classes/randombytes-1"))); + assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key())); + assertEquals("target/data_test", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE)); + } + + @Test + public void testPutFileWhenDirectoryUsesUnrecognizedEL() throws IOException { + // Refer to comment in the BeforeClass method for an explanation + assumeTrue(isNotWindows()); + + PutHDFS proc = new TestablePutHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); + + // this value somehow causes NiFi to not even recognize the EL, and thus it returns successfully from calling + // evaluateAttributeExpressions and then tries to create a Path with the exact value below and blows up + runner.setProperty(PutHDFS.DIRECTORY, "data_${literal('testing'):substring(0,4)%7D"); + + runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { + Map attributes = new HashMap(); + attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); + runner.enqueue(fis, attributes); + runner.run(); + } + + runner.assertAllFlowFilesTransferred(PutHDFS.REL_FAILURE); + } + + @Test + public void testPutFileWhenDirectoryUsesInvalidEL() throws IOException { + // Refer to comment in the BeforeClass method for an explanation + assumeTrue(isNotWindows()); + + PutHDFS proc = new TestablePutHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); + // the validator should pick up the invalid EL + runner.setProperty(PutHDFS.DIRECTORY, "target/data_${literal('testing'):foo()}"); + runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); + runner.assertNotValid(); + } + private boolean isNotWindows() { return !System.getProperty("os.name").startsWith("Windows"); } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java new file mode 100644 index 0000000000..e49975b630 --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestFetchHDFS { + + private TestRunner runner; + private TestableFetchHDFS proc; + private NiFiProperties mockNiFiProperties; + private KerberosProperties kerberosProperties; + + @Before + public void setup() { + mockNiFiProperties = mock(NiFiProperties.class); + when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null); + kerberosProperties = KerberosProperties.create(mockNiFiProperties); + + proc = new TestableFetchHDFS(kerberosProperties); + runner = TestRunners.newTestRunner(proc); + } + + @Test + public void testFetchStaticFileThatExists() throws IOException { + final String file = "src/test/resources/testdata/randombytes-1"; + runner.setProperty(FetchHDFS.FILENAME, file); + runner.enqueue(new String("trigger flow file")); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1); + } + + @Test + public void testFetchStaticFileThatDoesNotExist() throws IOException { + final String file = "src/test/resources/testdata/doesnotexist"; + runner.setProperty(FetchHDFS.FILENAME, file); + runner.enqueue(new String("trigger flow file")); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1); + } + + @Test + public void testFetchFileThatExistsFromIncomingFlowFile() throws IOException { + final String file = "src/test/resources/testdata/randombytes-1"; + runner.setProperty(FetchHDFS.FILENAME, "${my.file}"); + + final Map attributes = new HashMap<>(); + attributes.put("my.file", file); + + runner.enqueue(new String("trigger flow file"), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1); + } + + @Test + public void testFilenameWithValidEL() throws IOException { + final String file = "src/test/resources/testdata/${literal('randombytes-1')}"; + runner.setProperty(FetchHDFS.FILENAME, file); + runner.enqueue(new String("trigger flow file")); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1); + } + + @Test + public void testFilenameWithInvalidEL() throws IOException { + final String file = "src/test/resources/testdata/${literal('randombytes-1'):foo()}"; + runner.setProperty(FetchHDFS.FILENAME, file); + runner.assertNotValid(); + } + + @Test + public void testFilenameWithUnrecognizedEL() throws IOException { + final String file = "data_${literal('testing'):substring(0,4)%7D"; + runner.setProperty(FetchHDFS.FILENAME, file); + runner.enqueue(new String("trigger flow file")); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1); + } + + private static class TestableFetchHDFS extends FetchHDFS { + private final KerberosProperties testKerberosProps; + + public TestableFetchHDFS(KerberosProperties testKerberosProps) { + this.testKerberosProps = testKerberosProps; + } + + @Override + protected KerberosProperties getKerberosProperties() { + return testKerberosProps; + } + + + } +} diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java index 6fcea95149..d4204ea175 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java @@ -82,6 +82,47 @@ public class TestListHDFS { runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service"); } + @Test + public void testListingWithValidELFunction() throws InterruptedException { + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):substring(0,5)}"); + + // first iteration will not pick up files because it has to instead check timestamps. + // We must then wait long enough to ensure that the listing can be performed safely and + // run the Processor again. + runner.run(); + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1); + final MockFlowFile mff = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0); + mff.assertAttributeEquals("path", "/test"); + mff.assertAttributeEquals("filename", "testFile.txt"); + } + + @Test + public void testListingWithInalidELFunction() throws InterruptedException { + runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):foo()}"); + runner.assertNotValid(); + } + + @Test + public void testListingWithUnrecognizedELFunction() throws InterruptedException { + proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); + + runner.setProperty(ListHDFS.DIRECTORY, "data_${literal('testing'):substring(0,4)%7D"); + + // first iteration will not pick up files because it has to instead check timestamps. + // We must then wait long enough to ensure that the listing can be performed safely and + // run the Processor again. + runner.run(); + Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS)); + runner.run(); + + runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0); + } + @Test public void testListingHasCorrectAttributes() throws InterruptedException { proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); @@ -287,13 +328,12 @@ public class TestListHDFS { } } - private class MockFileSystem extends FileSystem { private final Map> fileStatuses = new HashMap<>(); public void addFileStatus(final Path parent, final FileStatus child) { Set children = fileStatuses.get(parent); - if ( children == null ) { + if (children == null) { children = new HashSet<>(); fileStatuses.put(parent, children); } @@ -301,7 +341,6 @@ public class TestListHDFS { children.add(child); } - @Override public long getDefaultBlockSize() { return 1024L; @@ -324,7 +363,7 @@ public class TestListHDFS { @Override public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, - final long blockSize, final Progressable progress) throws IOException { + final long blockSize, final Progressable progress) throws IOException { return null; } @@ -346,7 +385,7 @@ public class TestListHDFS { @Override public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException { final Set statuses = fileStatuses.get(f); - if ( statuses == null ) { + if (statuses == null) { return new FileStatus[0]; } @@ -375,7 +414,6 @@ public class TestListHDFS { } - private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { private final ConcurrentMap values = new ConcurrentHashMap<>(); private boolean failOnCalls = false;