NIFI-2553 Fixing handling of Paths in HDFS processors

Signed-off-by: Yolanda M. Davis <ymdavis@apache.org>

This closes #843
This commit is contained in:
Bryan Bende 2016-08-11 15:50:54 -04:00 committed by Yolanda M. Davis
parent ca5a7fbf09
commit fd0dd51ff5
12 changed files with 368 additions and 88 deletions

View File

@ -24,6 +24,7 @@ import java.nio.charset.UnsupportedCharsetException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator; import org.apache.nifi.components.Validator;
@ -279,15 +280,17 @@ public class StandardValidators {
@Override @Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) { public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); try {
final String result = context.newExpressionLanguageCompiler().validateExpression(input, true);
if (!StringUtils.isEmpty(result)) {
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(result).build();
}
} catch (final Exception e) {
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(e.getMessage()).build();
}
} }
try { return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
context.newExpressionLanguageCompiler().compile(input);
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
} catch (final Exception e) {
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(e.getMessage()).build();
}
} }
}; };

View File

@ -91,19 +91,35 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
} }
// properties // properties
public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder().name("Hadoop Configuration Resources") public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
.name("Hadoop Configuration Resources")
.description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop " .description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop "
+ "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.") + "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.")
.required(false).addValidator(createMultipleFilesExistValidator()).build(); .required(false)
.addValidator(createMultipleFilesExistValidator())
.build();
public static final String DIRECTORY_PROP_NAME = "Directory"; public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.name("Directory")
.description("The HDFS directory from which files should be read")
.required(true)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("Compression codec").required(true) public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
.allowableValues(CompressionType.values()).defaultValue(CompressionType.NONE.toString()).build(); .name("Compression codec")
.required(true)
.allowableValues(CompressionType.values())
.defaultValue(CompressionType.NONE.toString())
.build();
public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Relogin Period").required(false) public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder()
.description("Period of time which should pass before attempting a kerberos relogin").defaultValue("4 hours") .name("Kerberos Relogin Period").required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .description("Period of time which should pass before attempting a kerberos relogin")
.defaultValue("4 hours")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
private static final Object RESOURCES_LOCK = new Object(); private static final Object RESOURCES_LOCK = new Object();
@ -191,19 +207,8 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
} }
HdfsResources resources = hdfsResources.get(); HdfsResources resources = hdfsResources.get();
if (resources.getConfiguration() == null) { if (resources.getConfiguration() == null) {
String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); final String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
final String dir; resources = resetHDFSResources(configResources, context);
final PropertyDescriptor directoryPropDescriptor = getPropertyDescriptor(DIRECTORY_PROP_NAME);
if (directoryPropDescriptor != null) {
if (directoryPropDescriptor.isExpressionLanguageSupported()) {
dir = context.getProperty(DIRECTORY_PROP_NAME).evaluateAttributeExpressions().getValue();
} else {
dir = context.getProperty(DIRECTORY_PROP_NAME).getValue();
}
} else {
dir = null;
}
resources = resetHDFSResources(configResources, dir == null ? "/" : dir, context);
hdfsResources.set(resources); hdfsResources.set(resources);
} }
} catch (IOException ex) { } catch (IOException ex) {
@ -249,7 +254,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
/* /*
* Reset Hadoop Configuration and FileSystem based on the supplied configuration resources. * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
*/ */
HdfsResources resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException { HdfsResources resetHDFSResources(String configResources, ProcessContext context) throws IOException {
// org.apache.hadoop.conf.Configuration saves its current thread context class loader to use for threads that it creates // org.apache.hadoop.conf.Configuration saves its current thread context class loader to use for threads that it creates
// later to do I/O. We need this class loader to be the NarClassLoader instead of the magical // later to do I/O. We need this class loader to be the NarClassLoader instead of the magical
// NarThreadContextClassLoader. // NarThreadContextClassLoader.
@ -286,8 +291,10 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
} }
} }
final Path workingDir = fs.getWorkingDirectory();
getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
new Object[] { fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), fs.getDefaultReplication(new Path(dir)), config.toString() }); new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()});
return new HdfsResources(config, fs, ugi); return new HdfsResources(config, fs, ugi);
} finally { } finally {

View File

@ -54,13 +54,14 @@ import java.util.concurrent.TimeUnit;
+ "not be fetched from HDFS") + "not be fetched from HDFS")
@SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class}) @SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class})
public class FetchHDFS extends AbstractHadoopProcessor { public class FetchHDFS extends AbstractHadoopProcessor {
static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder() static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
.name("HDFS Filename") .name("HDFS Filename")
.description("The name of the HDFS file to retrieve") .description("The name of the HDFS file to retrieve")
.required(true) .required(true)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.defaultValue("${path}/${filename}") .defaultValue("${path}/${filename}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.build(); .build();
static final Relationship REL_SUCCESS = new Relationship.Builder() static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -102,9 +103,20 @@ public class FetchHDFS extends AbstractHadoopProcessor {
} }
final FileSystem hdfs = getFileSystem(); final FileSystem hdfs = getFileSystem();
final Path path = new Path(context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue()); final String filenameValue = context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
final URI uri = path.toUri();
Path path = null;
try {
path = new Path(filenameValue);
} catch (IllegalArgumentException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, flowFile, e});
flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
final URI uri = path.toUri();
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
try (final FSDataInputStream inStream = hdfs.open(path, 16384)) { try (final FSDataInputStream inStream = hdfs.open(path, 16384)) {
flowFile = session.importFrom(inStream, flowFile); flowFile = session.importFrom(inStream, flowFile);

View File

@ -86,14 +86,6 @@ public class GetHDFS extends AbstractHadoopProcessor {
.build(); .build();
// properties // properties
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.name(DIRECTORY_PROP_NAME)
.description("The HDFS directory from which files should be read")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
.name("Recurse Subdirectories") .name("Recurse Subdirectories")
.description("Indicates whether to pull files from subdirectories of the HDFS directory") .description("Indicates whether to pull files from subdirectories of the HDFS directory")
@ -224,6 +216,16 @@ public class GetHDFS extends AbstractHadoopProcessor {
.explanation(MIN_AGE.getName() + " cannot be greater than " + MAX_AGE.getName()).build()); .explanation(MIN_AGE.getName() + " cannot be greater than " + MAX_AGE.getName()).build());
} }
try {
new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
} catch (Exception e) {
problems.add(new ValidationResult.Builder()
.valid(false)
.subject("Directory")
.explanation(e.getMessage())
.build());
}
return problems; return problems;
} }

View File

@ -42,7 +42,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.util.HDFSListing; import org.apache.nifi.processors.hadoop.util.HDFSListing;
import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.JsonParseException;
@ -89,6 +88,7 @@ import java.util.concurrent.TimeUnit;
+ "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.") + "Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class}) @SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
public class ListHDFS extends AbstractHadoopProcessor { public class ListHDFS extends AbstractHadoopProcessor {
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service") .name("Distributed Cache Service")
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node " .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node "
@ -97,14 +97,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
.identifiesControllerService(DistributedMapCacheClient.class) .identifiesControllerService(DistributedMapCacheClient.class)
.build(); .build();
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.name(DIRECTORY_PROP_NAME)
.description("The HDFS directory from which files should be read")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder() public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder()
.name("Recurse Subdirectories") .name("Recurse Subdirectories")
.description("Indicates whether to list files from subdirectories of the HDFS directory") .description("Indicates whether to list files from subdirectories of the HDFS directory")
@ -287,14 +279,14 @@ public class ListHDFS extends AbstractHadoopProcessor {
// Pull in any file that is newer than the timestamp that we have. // Pull in any file that is newer than the timestamp that we have.
final FileSystem hdfs = getFileSystem(); final FileSystem hdfs = getFileSystem();
final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
final Path rootPath = new Path(directory);
final Set<FileStatus> statuses; final Set<FileStatus> statuses;
try { try {
final Path rootPath = new Path(directory);
statuses = getStatuses(rootPath, recursive, hdfs); statuses = getStatuses(rootPath, recursive, hdfs);
getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()}); getLogger().debug("Found a total of {} files in HDFS", new Object[] {statuses.size()});
} catch (final IOException ioe) { } catch (final IOException | IllegalArgumentException e) {
getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {ioe}); getLogger().error("Failed to perform listing of HDFS due to {}", new Object[] {e});
return; return;
} }

View File

@ -96,13 +96,6 @@ public class PutHDFS extends AbstractHadoopProcessor {
.build(); .build();
// properties // properties
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
.name(DIRECTORY_PROP_NAME)
.description("The parent HDFS directory to which files should be written")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("Conflict Resolution Strategy") .name("Conflict Resolution Strategy")
@ -168,7 +161,10 @@ public class PutHDFS extends AbstractHadoopProcessor {
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> props = new ArrayList<>(properties); List<PropertyDescriptor> props = new ArrayList<>(properties);
props.add(DIRECTORY); props.add(new PropertyDescriptor.Builder()
.fromPropertyDescriptor(DIRECTORY)
.description("The parent HDFS directory to which files should be written")
.build());
props.add(CONFLICT_RESOLUTION); props.add(CONFLICT_RESOLUTION);
props.add(BLOCK_SIZE); props.add(BLOCK_SIZE);
props.add(BUFFER_SIZE); props.add(BUFFER_SIZE);
@ -212,27 +208,29 @@ public class PutHDFS extends AbstractHadoopProcessor {
return; return;
} }
final Path configuredRootDirPath = new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue());
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs
.getDefaultReplication(configuredRootDirPath);
final CompressionCodec codec = getCompressionCodec(context, configuration);
final String filename = codec != null
? flowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
: flowFile.getAttribute(CoreAttributes.FILENAME.key());
Path tempDotCopyFile = null; Path tempDotCopyFile = null;
try { try {
final String dirValue = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final Path configuredRootDirPath = new Path(dirValue);
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
final Double blockSizeProp = context.getProperty(BLOCK_SIZE).asDataSize(DataUnit.B);
final long blockSize = blockSizeProp != null ? blockSizeProp.longValue() : hdfs.getDefaultBlockSize(configuredRootDirPath);
final Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
final int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : configuration.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
final Integer replicationProp = context.getProperty(REPLICATION_FACTOR).asInteger();
final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs
.getDefaultReplication(configuredRootDirPath);
final CompressionCodec codec = getCompressionCodec(context, configuration);
final String filename = codec != null
? flowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
: flowFile.getAttribute(CoreAttributes.FILENAME.key());
final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename); final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename);
final Path copyFile = new Path(configuredRootDirPath, filename); final Path copyFile = new Path(configuredRootDirPath, filename);

View File

@ -112,7 +112,7 @@ public class AbstractHadoopTest {
SimpleHadoopProcessor processor = new SimpleHadoopProcessor(kerberosProperties); SimpleHadoopProcessor processor = new SimpleHadoopProcessor(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(processor); TestRunner runner = TestRunners.newTestRunner(processor);
try { try {
processor.resetHDFSResources("src/test/resources/core-site-broken.xml", "/target", runner.getProcessContext()); processor.resetHDFSResources("src/test/resources/core-site-broken.xml", runner.getProcessContext());
Assert.fail("Should have thrown SocketTimeoutException"); Assert.fail("Should have thrown SocketTimeoutException");
} catch (IOException e) { } catch (IOException e) {
} }

View File

@ -103,7 +103,7 @@ public class GetHDFSSequenceFileTest {
public class TestableGetHDFSSequenceFile extends GetHDFSSequenceFile { public class TestableGetHDFSSequenceFile extends GetHDFSSequenceFile {
@Override @Override
HdfsResources resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException { HdfsResources resetHDFSResources(String configResources, ProcessContext context) throws IOException {
return hdfsResources; return hdfsResources;
} }

View File

@ -196,6 +196,47 @@ public class GetHDFSTest {
flowFile.assertContentEquals(expected); flowFile.assertContentEquals(expected);
} }
@Test
public void testDirectoryUsesValidEL() throws IOException {
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/${literal('testdata'):substring(0,8)}");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip");
flowFile.assertContentEquals(expected);
}
@Test
public void testDirectoryUsesUnrecognizedEL() throws IOException {
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "data_${literal('testing'):substring(0,4)%7D");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
runner.assertNotValid();
}
@Test
public void testDirectoryUsesInvalidEL() throws IOException {
GetHDFS proc = new TestableGetHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "data_${literal('testing'):foo()}");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
runner.assertNotValid();
}
private static class TestableGetHDFS extends GetHDFS { private static class TestableGetHDFS extends GetHDFS {
private final KerberosProperties testKerberosProperties; private final KerberosProperties testKerberosProperties;

View File

@ -295,6 +295,73 @@ public class PutHDFSTest {
fs.delete(p, true); fs.delete(p, true);
} }
@Test
public void testPutFileWhenDirectoryUsesValidELFunction() throws IOException {
// Refer to comment in the BeforeClass method for an explanation
assumeTrue(isNotWindows());
PutHDFS proc = new TestablePutHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHDFS.DIRECTORY, "target/data_${literal('testing'):substring(0,4)}");
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) {
Map<String, String> attributes = new HashMap<String, String>();
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
runner.enqueue(fis, attributes);
runner.run();
}
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config);
List<MockFlowFile> failedFlowFiles = runner
.getFlowFilesForRelationship(new Relationship.Builder().name("failure").build());
assertTrue(failedFlowFiles.isEmpty());
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(fs.exists(new Path("target/test-classes/randombytes-1")));
assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
assertEquals("target/data_test", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
}
@Test
public void testPutFileWhenDirectoryUsesUnrecognizedEL() throws IOException {
// Refer to comment in the BeforeClass method for an explanation
assumeTrue(isNotWindows());
PutHDFS proc = new TestablePutHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
// this value somehow causes NiFi to not even recognize the EL, and thus it returns successfully from calling
// evaluateAttributeExpressions and then tries to create a Path with the exact value below and blows up
runner.setProperty(PutHDFS.DIRECTORY, "data_${literal('testing'):substring(0,4)%7D");
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) {
Map<String, String> attributes = new HashMap<String, String>();
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
runner.enqueue(fis, attributes);
runner.run();
}
runner.assertAllFlowFilesTransferred(PutHDFS.REL_FAILURE);
}
@Test
public void testPutFileWhenDirectoryUsesInvalidEL() throws IOException {
// Refer to comment in the BeforeClass method for an explanation
assumeTrue(isNotWindows());
PutHDFS proc = new TestablePutHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
// the validator should pick up the invalid EL
runner.setProperty(PutHDFS.DIRECTORY, "target/data_${literal('testing'):foo()}");
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
runner.assertNotValid();
}
private boolean isNotWindows() { private boolean isNotWindows() {
return !System.getProperty("os.name").startsWith("Windows"); return !System.getProperty("os.name").startsWith("Windows");
} }

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestFetchHDFS {
private TestRunner runner;
private TestableFetchHDFS proc;
private NiFiProperties mockNiFiProperties;
private KerberosProperties kerberosProperties;
@Before
public void setup() {
mockNiFiProperties = mock(NiFiProperties.class);
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
kerberosProperties = KerberosProperties.create(mockNiFiProperties);
proc = new TestableFetchHDFS(kerberosProperties);
runner = TestRunners.newTestRunner(proc);
}
@Test
public void testFetchStaticFileThatExists() throws IOException {
final String file = "src/test/resources/testdata/randombytes-1";
runner.setProperty(FetchHDFS.FILENAME, file);
runner.enqueue(new String("trigger flow file"));
runner.run();
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
}
@Test
public void testFetchStaticFileThatDoesNotExist() throws IOException {
final String file = "src/test/resources/testdata/doesnotexist";
runner.setProperty(FetchHDFS.FILENAME, file);
runner.enqueue(new String("trigger flow file"));
runner.run();
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1);
}
@Test
public void testFetchFileThatExistsFromIncomingFlowFile() throws IOException {
final String file = "src/test/resources/testdata/randombytes-1";
runner.setProperty(FetchHDFS.FILENAME, "${my.file}");
final Map<String,String> attributes = new HashMap<>();
attributes.put("my.file", file);
runner.enqueue(new String("trigger flow file"), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
}
@Test
public void testFilenameWithValidEL() throws IOException {
final String file = "src/test/resources/testdata/${literal('randombytes-1')}";
runner.setProperty(FetchHDFS.FILENAME, file);
runner.enqueue(new String("trigger flow file"));
runner.run();
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_SUCCESS, 1);
}
@Test
public void testFilenameWithInvalidEL() throws IOException {
final String file = "src/test/resources/testdata/${literal('randombytes-1'):foo()}";
runner.setProperty(FetchHDFS.FILENAME, file);
runner.assertNotValid();
}
@Test
public void testFilenameWithUnrecognizedEL() throws IOException {
final String file = "data_${literal('testing'):substring(0,4)%7D";
runner.setProperty(FetchHDFS.FILENAME, file);
runner.enqueue(new String("trigger flow file"));
runner.run();
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1);
}
private static class TestableFetchHDFS extends FetchHDFS {
private final KerberosProperties testKerberosProps;
public TestableFetchHDFS(KerberosProperties testKerberosProps) {
this.testKerberosProps = testKerberosProps;
}
@Override
protected KerberosProperties getKerberosProperties() {
return testKerberosProps;
}
}
}

View File

@ -82,6 +82,47 @@ public class TestListHDFS {
runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service"); runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service");
} }
@Test
public void testListingWithValidELFunction() throws InterruptedException {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):substring(0,5)}");
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS).get(0);
mff.assertAttributeEquals("path", "/test");
mff.assertAttributeEquals("filename", "testFile.txt");
}
@Test
public void testListingWithInalidELFunction() throws InterruptedException {
runner.setProperty(ListHDFS.DIRECTORY, "${literal('/test'):foo()}");
runner.assertNotValid();
}
@Test
public void testListingWithUnrecognizedELFunction() throws InterruptedException {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
runner.setProperty(ListHDFS.DIRECTORY, "data_${literal('testing'):substring(0,4)%7D");
// first iteration will not pick up files because it has to instead check timestamps.
// We must then wait long enough to ensure that the listing can be performed safely and
// run the Processor again.
runner.run();
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
runner.run();
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
}
@Test @Test
public void testListingHasCorrectAttributes() throws InterruptedException { public void testListingHasCorrectAttributes() throws InterruptedException {
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt"))); proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testFile.txt")));
@ -287,13 +328,12 @@ public class TestListHDFS {
} }
} }
private class MockFileSystem extends FileSystem { private class MockFileSystem extends FileSystem {
private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>(); private final Map<Path, Set<FileStatus>> fileStatuses = new HashMap<>();
public void addFileStatus(final Path parent, final FileStatus child) { public void addFileStatus(final Path parent, final FileStatus child) {
Set<FileStatus> children = fileStatuses.get(parent); Set<FileStatus> children = fileStatuses.get(parent);
if ( children == null ) { if (children == null) {
children = new HashSet<>(); children = new HashSet<>();
fileStatuses.put(parent, children); fileStatuses.put(parent, children);
} }
@ -301,7 +341,6 @@ public class TestListHDFS {
children.add(child); children.add(child);
} }
@Override @Override
public long getDefaultBlockSize() { public long getDefaultBlockSize() {
return 1024L; return 1024L;
@ -324,7 +363,7 @@ public class TestListHDFS {
@Override @Override
public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
final long blockSize, final Progressable progress) throws IOException { final long blockSize, final Progressable progress) throws IOException {
return null; return null;
} }
@ -346,7 +385,7 @@ public class TestListHDFS {
@Override @Override
public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException { public FileStatus[] listStatus(final Path f) throws FileNotFoundException, IOException {
final Set<FileStatus> statuses = fileStatuses.get(f); final Set<FileStatus> statuses = fileStatuses.get(f);
if ( statuses == null ) { if (statuses == null) {
return new FileStatus[0]; return new FileStatus[0];
} }
@ -375,7 +414,6 @@ public class TestListHDFS {
} }
private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>(); private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
private boolean failOnCalls = false; private boolean failOnCalls = false;