NIFI-3366 This closes #2332. Added parent/child flowfile relationship between the incoming flowfile and the files that are moved from the input directory to the output directory.

Updated to allow tests to check for evaluation of properties that support expression language.
Fixed bug with changeOwner attempting to operate on original file rather than the moved/copied file.
Added license header to MoveHDFSTest.java
Added test for moving a directory of files that contains a subdir, ensuring non-recursive behavior
Added to the description of the processor that it is non-recursive when a directory is used as input.
Added RAT exclude for test resource .dotfile to pom.xml

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Jeff Storck 2017-12-07 15:39:22 -05:00 committed by joewitt
parent 3731fbee88
commit 600586d6be
4 changed files with 618 additions and 551 deletions

View File

@ -66,4 +66,19 @@
<artifactId>nifi-properties</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/testdata/.dotfile</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -32,6 +32,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -60,145 +61,141 @@ import org.apache.nifi.util.StopWatch;
/**
* This processor renames files on HDFS.
*/
@Tags({ "hadoop", "HDFS", "put", "move", "filesystem", "restricted", "moveHDFS" })
@CapabilityDescription("Rename existing files on Hadoop Distributed File System (HDFS)")
@Tags({"hadoop", "HDFS", "put", "move", "filesystem", "restricted", "moveHDFS"})
@CapabilityDescription("Rename existing files or a directory of files (non-recursive) on Hadoop Distributed File System (HDFS).")
@ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.")
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."),
@WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute.") })
@SeeAlso({ PutHDFS.class, GetHDFS.class })
@WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."),
@WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute.")})
@SeeAlso({PutHDFS.class, GetHDFS.class})
public class MoveHDFS extends AbstractHadoopProcessor {
// static global
public static final int MAX_WORKING_QUEUE_SIZE = 25000;
public static final String REPLACE_RESOLUTION = "replace";
public static final String IGNORE_RESOLUTION = "ignore";
public static final String FAIL_RESOLUTION = "fail";
// static global
public static final String REPLACE_RESOLUTION = "replace";
public static final String IGNORE_RESOLUTION = "ignore";
public static final String FAIL_RESOLUTION = "fail";
private static final Set<Relationship> relationships;
private static final Set<Relationship> relationships;
public static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
REPLACE_RESOLUTION, "Replaces the existing file if any.");
public static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
"Failed rename operation stops processing and routes to success.");
public static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
"Failing to rename a file routes to failure.");
public static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION,
REPLACE_RESOLUTION, "Replaces the existing file if any.");
public static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
"Failed rename operation stops processing and routes to success.");
public static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION,
"Failing to rename a file routes to failure.");
public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
public static final int BUFFER_SIZE_DEFAULT = 4096;
public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
// relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Files that have been successfully renamed on HDFS are transferred to this relationship")
.build();
// relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Files that have been successfully renamed on HDFS are transferred to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Files that could not be renamed on HDFS are transferred to this relationship").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Files that could not be renamed on HDFS are transferred to this relationship").build();
// properties
public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("Conflict Resolution Strategy")
.description(
"Indicates what should happen when a file with the same name already exists in the output directory")
.required(true).defaultValue(FAIL_RESOLUTION_AV.getValue())
.allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV).build();
// properties
public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("Conflict Resolution Strategy")
.description(
"Indicates what should happen when a file with the same name already exists in the output directory")
.required(true).defaultValue(FAIL_RESOLUTION_AV.getValue())
.allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV).build();
public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder()
.name("File Filter Regex")
.description(
"A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular "
+ "Expression will be fetched, otherwise all files will be fetched")
.required(false).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder()
.name("File Filter Regex")
.description(
"A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular "
+ "Expression will be fetched, otherwise all files will be fetched")
.required(false).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder()
.name("Ignore Dotted Files")
.description("If true, files whose names begin with a dot (\".\") will be ignored").required(true)
.allowableValues("true", "false").defaultValue("true").build();
public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder()
.name("Ignore Dotted Files")
.description("If true, files whose names begin with a dot (\".\") will be ignored").required(true)
.allowableValues("true", "false").defaultValue("true").build();
public static final PropertyDescriptor INPUT_DIRECTORY_OR_FILE = new PropertyDescriptor.Builder()
.name("Input Directory or File")
.description("The HDFS directory from which files should be read, or a single file to read.")
.defaultValue("${path}").addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.expressionLanguageSupported(true).build();
public static final PropertyDescriptor INPUT_DIRECTORY_OR_FILE = new PropertyDescriptor.Builder()
.name("Input Directory or File")
.description("The HDFS directory from which files should be read, or a single file to read")
.defaultValue("${path}").addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.expressionLanguageSupported(true).build();
public static final PropertyDescriptor OUTPUT_DIRECTORY = new PropertyDescriptor.Builder().name("Output Directory")
.description("The HDFS directory where the files will be moved to").required(true)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor OUTPUT_DIRECTORY = new PropertyDescriptor.Builder().name("Output Directory")
.description("The HDFS directory where the files will be moved to").required(true)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor OPERATION = new PropertyDescriptor.Builder().name("HDFS Operation")
.description("The operation that will be performed on the source file").required(true)
.allowableValues("move", "copy").defaultValue("move").build();
public static final PropertyDescriptor OPERATION = new PropertyDescriptor.Builder().name("HDFS Operation")
.description("The operation that will be performed on the source file").required(true)
.allowableValues("move", "copy").defaultValue("move").build();
public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder().name("Remote Owner")
.description(
"Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor REMOTE_OWNER = new PropertyDescriptor.Builder().name("Remote Owner")
.description(
"Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder().name("Remote Group")
.description(
"Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder().name("Remote Group")
.description(
"Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
static {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(rels);
}
static {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(rels);
}
// non-static global
protected ProcessorConfiguration processorConfig;
private final AtomicLong logEmptyListing = new AtomicLong(2L);
// non-static global
protected ProcessorConfiguration processorConfig;
private final AtomicLong logEmptyListing = new AtomicLong(2L);
private final Lock listingLock = new ReentrantLock();
private final Lock queueLock = new ReentrantLock();
private final Lock listingLock = new ReentrantLock();
private final Lock queueLock = new ReentrantLock();
private final BlockingQueue<Path> filePathQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<Path> processing = new LinkedBlockingQueue<>();
private final BlockingQueue<Path> filePathQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<Path> processing = new LinkedBlockingQueue<>();
// methods
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
// methods
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> props = new ArrayList<>(properties);
props.add(CONFLICT_RESOLUTION);
props.add(INPUT_DIRECTORY_OR_FILE);
props.add(OUTPUT_DIRECTORY);
props.add(OPERATION);
props.add(FILE_FILTER_REGEX);
props.add(IGNORE_DOTTED_FILES);
props.add(REMOTE_OWNER);
props.add(REMOTE_GROUP);
return props;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> props = new ArrayList<>(properties);
props.add(CONFLICT_RESOLUTION);
props.add(INPUT_DIRECTORY_OR_FILE);
props.add(OUTPUT_DIRECTORY);
props.add(OPERATION);
props.add(FILE_FILTER_REGEX);
props.add(IGNORE_DOTTED_FILES);
props.add(REMOTE_OWNER);
props.add(REMOTE_GROUP);
return props;
}
@OnScheduled
public void onScheduled(ProcessContext context) throws Exception {
super.abstractOnScheduled(context);
// copy configuration values to pass them around cleanly
processorConfig = new ProcessorConfiguration(context);
// forget the state of the queue in case HDFS contents changed while
// this processor was turned off
queueLock.lock();
try {
filePathQueue.clear();
processing.clear();
} finally {
queueLock.unlock();
}
}
@OnScheduled
public void onScheduled(ProcessContext context) throws Exception {
super.abstractOnScheduled(context);
// copy configuration values to pass them around cleanly
processorConfig = new ProcessorConfiguration(context);
// forget the state of the queue in case HDFS contents changed while
// this processor was turned off
queueLock.lock();
try {
filePathQueue.clear();
processing.clear();
} finally {
queueLock.unlock();
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
// MoveHDFS
FlowFile parentFlowFile = session.get();
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
// MoveHDFS
FlowFile parentFlowFile = session.get();
if (parentFlowFile == null) {
return;
}
@ -209,319 +206,318 @@ public class MoveHDFS extends AbstractHadoopProcessor {
Path inputPath = null;
try {
inputPath = new Path(filenameValue);
if(!hdfs.exists(inputPath)) {
throw new IOException("Input Directory or File does not exist in HDFS");
if (!hdfs.exists(inputPath)) {
throw new IOException("Input Directory or File does not exist in HDFS");
}
} catch (Exception e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, parentFlowFile, e});
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{filenameValue, parentFlowFile, e});
parentFlowFile = session.putAttribute(parentFlowFile, "hdfs.failure.reason", e.getMessage());
parentFlowFile = session.penalize(parentFlowFile);
session.transfer(parentFlowFile, REL_FAILURE);
session.transfer(parentFlowFile, REL_FAILURE);
return;
}
List<Path> files = new ArrayList<Path>();
try {
final StopWatch stopWatch = new StopWatch(true);
Set<Path> listedFiles = performListing(context, inputPath);
stopWatch.stop();
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
if (listedFiles != null) {
// place files into the work queue
int newItems = 0;
queueLock.lock();
try {
for (Path file : listedFiles) {
if (!filePathQueue.contains(file) && !processing.contains(file)) {
if (!filePathQueue.offer(file)) {
break;
}
newItems++;
}
}
} catch (Exception e) {
getLogger().warn("Could not add to processing queue due to {}", new Object[]{e.getMessage()}, e);
} finally {
queueLock.unlock();
}
if (listedFiles.size() > 0) {
logEmptyListing.set(3L);
}
if (logEmptyListing.getAndDecrement() > 0) {
getLogger().info(
"Obtained file listing in {} milliseconds; listing had {} items, {} of which were new",
new Object[]{millis, listedFiles.size(), newItems});
}
}
} catch (IOException e) {
context.yield();
getLogger().warn("Error while retrieving list of files due to {}", new Object[]{e});
return;
}
// prepare to process a batch of files in the queue
queueLock.lock();
try {
filePathQueue.drainTo(files);
if (files.isEmpty()) {
// nothing to do!
session.remove(parentFlowFile);
context.yield();
return;
}
} finally {
queueLock.unlock();
}
processBatchOfFiles(files, context, session, parentFlowFile);
queueLock.lock();
try {
processing.removeAll(files);
} finally {
queueLock.unlock();
}
session.remove(parentFlowFile);
}
List<Path> files = new ArrayList<Path>();
protected void processBatchOfFiles(final List<Path> files, final ProcessContext context,
final ProcessSession session, FlowFile parentFlowFile) {
Preconditions.checkState(parentFlowFile != null, "No parent flowfile for this batch was provided");
try {
final StopWatch stopWatch = new StopWatch(true);
Set<Path> listedFiles = performListing(context, inputPath);
stopWatch.stop();
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
// process the batch of files
final Configuration conf = getConfiguration();
final FileSystem hdfs = getFileSystem();
final UserGroupInformation ugi = getUserGroupInformation();
if (listedFiles != null) {
// place files into the work queue
int newItems = 0;
queueLock.lock();
try {
for (Path file : listedFiles) {
if (!filePathQueue.contains(file) && !processing.contains(file)) {
if (!filePathQueue.offer(file)) {
break;
}
newItems++;
}
}
} catch (Exception e) {
getLogger().warn("Could not add to processing queue due to {}", new Object[] { e });
} finally {
queueLock.unlock();
}
if (listedFiles.size() > 0) {
logEmptyListing.set(3L);
}
if (logEmptyListing.getAndDecrement() > 0) {
getLogger().info(
"Obtained file listing in {} milliseconds; listing had {} items, {} of which were new",
new Object[] { millis, listedFiles.size(), newItems });
}
}
} catch (IOException e) {
context.yield();
getLogger().warn("Error while retrieving list of files due to {}", new Object[] { e });
return;
}
if (conf == null || ugi == null) {
getLogger().error("Configuration or UserGroupInformation not configured properly");
session.transfer(parentFlowFile, REL_FAILURE);
context.yield();
return;
}
// prepare to process a batch of files in the queue
queueLock.lock();
try {
filePathQueue.drainTo(files);
if (files.isEmpty()) {
// nothing to do!
context.yield();
return;
}
} finally {
queueLock.unlock();
}
for (final Path file : files) {
processBatchOfFiles(files, context, session);
ugi.doAs(new PrivilegedAction<Object>() {
@Override
public Object run() {
FlowFile flowFile = session.create(parentFlowFile);
try {
final String originalFilename = file.getName();
final Path configuredRootOutputDirPath = processorConfig.getOutputDirectory();
final Path newFile = new Path(configuredRootOutputDirPath, originalFilename);
final boolean destinationExists = hdfs.exists(newFile);
// If destination file already exists, resolve that
// based on processor configuration
if (destinationExists) {
switch (processorConfig.getConflictResolution()) {
case REPLACE_RESOLUTION:
if (hdfs.delete(file, false)) {
getLogger().info("deleted {} in order to replace with the contents of {}",
new Object[]{file, flowFile});
}
break;
case IGNORE_RESOLUTION:
session.transfer(flowFile, REL_SUCCESS);
getLogger().info(
"transferring {} to success because file with same name already exists",
new Object[]{flowFile});
return null;
case FAIL_RESOLUTION:
session.transfer(session.penalize(flowFile), REL_FAILURE);
getLogger().warn(
"penalizing {} and routing to failure because file with same name already exists",
new Object[]{flowFile});
return null;
default:
break;
}
}
queueLock.lock();
try {
processing.removeAll(files);
} finally {
queueLock.unlock();
}
}
// Create destination directory if it does not exist
try {
if (!hdfs.getFileStatus(configuredRootOutputDirPath).isDirectory()) {
throw new IOException(configuredRootOutputDirPath.toString()
+ " already exists and is not a directory");
}
} catch (FileNotFoundException fe) {
if (!hdfs.mkdirs(configuredRootOutputDirPath)) {
throw new IOException(configuredRootOutputDirPath.toString() + " could not be created");
}
changeOwner(context, hdfs, configuredRootOutputDirPath);
}
protected void processBatchOfFiles(final List<Path> files, final ProcessContext context,
final ProcessSession session) {
// process the batch of files
final Configuration conf = getConfiguration();
final FileSystem hdfs = getFileSystem();
final UserGroupInformation ugi = getUserGroupInformation();
boolean moved = false;
for (int i = 0; i < 10; i++) { // try to rename multiple
// times.
if (processorConfig.getOperation().equals("move")) {
if (hdfs.rename(file, newFile)) {
moved = true;
break;// rename was successful
}
} else {
if (FileUtil.copy(hdfs, file, hdfs, newFile, false, conf)) {
moved = true;
break;// copy was successful
}
}
Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve
}
if (!moved) {
throw new ProcessException("Could not move file " + file + " to its final filename");
}
if (conf == null || ugi == null) {
getLogger().error("Configuration or UserGroupInformation not configured properly");
session.transfer(session.get(), REL_FAILURE);
context.yield();
return;
}
changeOwner(context, hdfs, newFile);
final String outputPath = newFile.toString();
final String newFilename = newFile.getName();
final String hdfsPath = newFile.getParent().toString();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename);
flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath
: "hdfs://" + outputPath;
session.getProvenanceReporter().send(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
for (final Path file : files) {
} catch (final Throwable t) {
getLogger().error("Failed to rename on HDFS due to {}", new Object[]{t});
session.transfer(session.penalize(flowFile), REL_FAILURE);
context.yield();
}
return null;
}
});
}
}
ugi.doAs(new PrivilegedAction<Object>() {
@Override
public Object run() {
FlowFile flowFile = session.create();
try {
final String originalFilename = file.getName();
final Path configuredRootOutputDirPath = processorConfig.getOutputDirectory();
final Path newFile = new Path(configuredRootOutputDirPath, originalFilename);
final boolean destinationExists = hdfs.exists(newFile);
// If destination file already exists, resolve that
// based on processor configuration
if (destinationExists) {
switch (processorConfig.getConflictResolution()) {
case REPLACE_RESOLUTION:
if (hdfs.delete(file, false)) {
getLogger().info("deleted {} in order to replace with the contents of {}",
new Object[] { file, flowFile });
}
break;
case IGNORE_RESOLUTION:
session.transfer(flowFile, REL_SUCCESS);
getLogger().info(
"transferring {} to success because file with same name already exists",
new Object[] { flowFile });
return null;
case FAIL_RESOLUTION:
session.transfer(session.penalize(flowFile), REL_FAILURE);
getLogger().warn(
"penalizing {} and routing to failure because file with same name already exists",
new Object[] { flowFile });
return null;
default:
break;
}
}
protected Set<Path> performListing(final ProcessContext context, Path path) throws IOException {
Set<Path> listing = null;
// Create destination directory if it does not exist
try {
if (!hdfs.getFileStatus(configuredRootOutputDirPath).isDirectory()) {
throw new IOException(configuredRootOutputDirPath.toString()
+ " already exists and is not a directory");
}
} catch (FileNotFoundException fe) {
if (!hdfs.mkdirs(configuredRootOutputDirPath)) {
throw new IOException(configuredRootOutputDirPath.toString() + " could not be created");
}
changeOwner(context, hdfs, configuredRootOutputDirPath);
}
if (listingLock.tryLock()) {
try {
final FileSystem hdfs = getFileSystem();
// get listing
listing = selectFiles(hdfs, path, null);
} finally {
listingLock.unlock();
}
}
boolean moved = false;
for (int i = 0; i < 10; i++) { // try to rename multiple
// times.
if (processorConfig.getOperation().equals("move")) {
if (hdfs.rename(file, newFile)) {
moved = true;
break;// rename was successful
}
} else {
if (FileUtil.copy(hdfs, file, hdfs, newFile, false, conf)) {
moved = true;
break;// copy was successful
}
}
Thread.sleep(200L);// try waiting to let whatever
// might cause rename failure to
// resolve
}
if (!moved) {
throw new ProcessException("Could not move file " + file + " to its final filename");
}
return listing;
}
changeOwner(context, hdfs, file);
final String outputPath = newFile.toString();
final String newFilename = newFile.getName();
final String hdfsPath = newFile.getParent().toString();
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename);
flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath
: "hdfs://" + outputPath;
session.getProvenanceReporter().send(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name) {
try {
// Change owner and group of file if configured to do so
String owner = context.getProperty(REMOTE_OWNER).getValue();
String group = context.getProperty(REMOTE_GROUP).getValue();
if (owner != null || group != null) {
hdfs.setOwner(name, owner, group);
}
} catch (Exception e) {
getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e.getMessage()}, e);
}
}
} catch (final Throwable t) {
getLogger().error("Failed to rename on HDFS due to {}", new Object[] { t });
session.transfer(session.penalize(flowFile), REL_FAILURE);
context.yield();
}
return null;
}
});
}
}
protected Set<Path> selectFiles(final FileSystem hdfs, final Path inputPath, Set<Path> filesVisited)
throws IOException {
if (null == filesVisited) {
filesVisited = new HashSet<>();
}
protected Set<Path> performListing(final ProcessContext context, Path path) throws IOException {
Set<Path> listing = null;
if (!hdfs.exists(inputPath)) {
throw new IOException("Selection directory " + inputPath.toString() + " doesn't appear to exist!");
}
if (listingLock.tryLock()) {
try {
final FileSystem hdfs = getFileSystem();
// get listing
listing = selectFiles(hdfs, path, null);
} finally {
listingLock.unlock();
}
}
final Set<Path> files = new HashSet<>();
return listing;
}
FileStatus inputStatus = hdfs.getFileStatus(inputPath);
protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name) {
try {
// Change owner and group of file if configured to do so
String owner = context.getProperty(REMOTE_OWNER).getValue();
String group = context.getProperty(REMOTE_GROUP).getValue();
if (owner != null || group != null) {
hdfs.setOwner(name, owner, group);
}
} catch (Exception e) {
getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[] { name, e });
}
}
if (inputStatus.isDirectory()) {
for (final FileStatus file : hdfs.listStatus(inputPath)) {
final Path canonicalFile = file.getPath();
protected Set<Path> selectFiles(final FileSystem hdfs, final Path inputPath, Set<Path> filesVisited)
throws IOException {
if (null == filesVisited) {
filesVisited = new HashSet<>();
}
if (!filesVisited.add(canonicalFile)) { // skip files we've already seen (may be looping directory links)
continue;
}
if (!hdfs.exists(inputPath)) {
throw new IOException("Selection directory " + inputPath.toString() + " doesn't appear to exist!");
}
if (!file.isDirectory() && processorConfig.getPathFilter(inputPath).accept(canonicalFile)) {
files.add(canonicalFile);
final Set<Path> files = new HashSet<>();
if (getLogger().isDebugEnabled()) {
getLogger().debug(this + " selected file at path: " + canonicalFile.toString());
}
}
}
} else if (inputStatus.isFile()) {
files.add(inputPath);
}
return files;
}
FileStatus inputStatus = hdfs.getFileStatus(inputPath);
protected static class ProcessorConfiguration {
if (inputStatus.isDirectory()) {
for (final FileStatus file : hdfs.listStatus(inputPath)) {
final Path canonicalFile = file.getPath();
final private String conflictResolution;
final private String operation;
final private Path inputRootDirPath;
final private Path outputRootDirPath;
final private Pattern fileFilterPattern;
final private boolean ignoreDottedFiles;
if (!filesVisited.add(canonicalFile)) { // skip files we've
// already
// seen (may be looping
// directory links)
continue;
}
ProcessorConfiguration(final ProcessContext context) {
conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
operation = context.getProperty(OPERATION).getValue();
final String inputDirValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).evaluateAttributeExpressions().getValue();
inputRootDirPath = new Path(inputDirValue);
final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).evaluateAttributeExpressions().getValue();
outputRootDirPath = new Path(outputDirValue);
final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);
ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
}
if (!file.isDirectory() && processorConfig.getPathFilter(inputPath).accept(canonicalFile)) {
files.add(canonicalFile);
public String getOperation() {
return operation;
}
if (getLogger().isDebugEnabled()) {
getLogger().debug(this + " selected file at path: " + canonicalFile.toString());
}
}
}
} else if (inputStatus.isFile()) {
files.add(inputPath);
}
return files;
}
public String getConflictResolution() {
return conflictResolution;
}
protected static class ProcessorConfiguration {
public Path getInput() {
return inputRootDirPath;
}
final private String conflictResolution;
final private String operation;
final private Path inputRootDirPath;
final private Path outputRootDirPath;
final private Pattern fileFilterPattern;
final private boolean ignoreDottedFiles;
public Path getOutputDirectory() {
return outputRootDirPath;
}
ProcessorConfiguration(final ProcessContext context) {
conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
operation = context.getProperty(OPERATION).getValue();
final String inputDirValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).getValue();
inputRootDirPath = new Path(inputDirValue);
final String outputDirValue = context.getProperty(OUTPUT_DIRECTORY).getValue();
outputRootDirPath = new Path(outputDirValue);
final String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
fileFilterPattern = (fileFilterRegex == null) ? null : Pattern.compile(fileFilterRegex);
ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
}
protected PathFilter getPathFilter(final Path dir) {
return new PathFilter() {
public String getOperation() {
return operation;
}
@Override
public boolean accept(Path path) {
if (ignoreDottedFiles && path.getName().startsWith(".")) {
return false;
}
final String pathToCompare;
String relativePath = getPathDifference(dir, path);
if (relativePath.length() == 0) {
pathToCompare = path.getName();
} else {
pathToCompare = relativePath + Path.SEPARATOR + path.getName();
}
public String getConflictResolution() {
return conflictResolution;
}
if (fileFilterPattern != null && !fileFilterPattern.matcher(pathToCompare).matches()) {
return false;
}
return true;
}
public Path getInput() {
return inputRootDirPath;
}
public Path getOutputDirectory() {
return outputRootDirPath;
}
protected PathFilter getPathFilter(final Path dir) {
return new PathFilter() {
@Override
public boolean accept(Path path) {
if (ignoreDottedFiles && path.getName().startsWith(".")) {
return false;
}
final String pathToCompare;
String relativePath = getPathDifference(dir, path);
if (relativePath.length() == 0) {
pathToCompare = path.getName();
} else {
pathToCompare = relativePath + Path.SEPARATOR + path.getName();
}
if (fileFilterPattern != null && !fileFilterPattern.matcher(pathToCompare).matches()) {
return false;
}
return true;
}
};
}
}
};
}
}
}

View File

@ -1,15 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hadoop;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
@ -23,173 +30,222 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MoveHDFSTest {
private static final String OUTPUT_DIRECTORY = "src/test/resources/testdataoutput";
private static final String INPUT_DIRECTORY = "src/test/resources/testdata";
private static final String DOT_FILE_PATH = "src/test/resources/testdata/.testfordotfiles";
private NiFiProperties mockNiFiProperties;
private KerberosProperties kerberosProperties;
private static final String OUTPUT_DIRECTORY = "target/test-data-output";
private static final String TEST_DATA_DIRECTORY = "src/test/resources/testdata";
private static final String INPUT_DIRECTORY = "target/test-data-input";
private NiFiProperties mockNiFiProperties;
private KerberosProperties kerberosProperties;
@Before
public void setup() {
mockNiFiProperties = mock(NiFiProperties.class);
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
kerberosProperties = new KerberosProperties(null);
}
@Before
public void setup() {
mockNiFiProperties = mock(NiFiProperties.class);
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
kerberosProperties = new KerberosProperties(null);
}
@After
public void teardown() {
File outputDirectory = new File(OUTPUT_DIRECTORY);
if (outputDirectory.exists()) {
if (outputDirectory.isDirectory()) {
moveFilesFromOutputDirectoryToInput();
}
outputDirectory.delete();
}
removeDotFile();
}
@After
public void teardown() {
File inputDirectory = new File(INPUT_DIRECTORY);
File outputDirectory = new File(OUTPUT_DIRECTORY);
if (inputDirectory.exists()) {
Assert.assertTrue("Could not delete input directory: " + inputDirectory, FileUtils.deleteQuietly(inputDirectory));
}
if (outputDirectory.exists()) {
Assert.assertTrue("Could not delete output directory: " + outputDirectory, FileUtils.deleteQuietly(outputDirectory));
}
}
private void removeDotFile() {
File dotFile = new File(DOT_FILE_PATH);
if (dotFile.exists()) {
dotFile.delete();
}
}
@Test
public void testOutputDirectoryValidator() {
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
Collection<ValidationResult> results;
ProcessContext pc;
private void moveFilesFromOutputDirectoryToInput() {
File folder = new File(OUTPUT_DIRECTORY);
for (File file : folder.listFiles()) {
if (file.isFile()) {
String path = file.getAbsolutePath();
if(!path.endsWith(".crc")) {
String newPath = path.replaceAll("testdataoutput", "testdata");
File newFile = new File(newPath);
if (!newFile.exists()) {
file.renameTo(newFile);
}
} else {
file.delete();
}
}
}
}
results = new HashSet<>();
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, "/source");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
assertTrue(vr.toString().contains("Output Directory is required"));
}
}
@Test
public void testOutputDirectoryValidator() {
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
Collection<ValidationResult> results;
ProcessContext pc;
@Test
public void testBothInputAndOutputDirectoriesAreValid() {
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
Collection<ValidationResult> results;
ProcessContext pc;
results = new HashSet<>();
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, "/source");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
assertTrue(vr.toString().contains("Output Directory is required"));
}
}
results = new HashSet<>();
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(0, results.size());
}
@Test
public void testBothInputAndOutputDirectoriesAreValid() {
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
Collection<ValidationResult> results;
ProcessContext pc;
@Test
public void testOnScheduledShouldRunCleanly() throws IOException {
FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new File(INPUT_DIRECTORY));
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
runner.enqueue(new byte[0]);
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
Assert.assertEquals(7, flowFiles.size());
}
results = new HashSet<>();
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(0, results.size());
}
@Test
public void testDotFileFilterIgnore() throws IOException {
FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new File(INPUT_DIRECTORY));
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
runner.setProperty(MoveHDFS.IGNORE_DOTTED_FILES, "true");
runner.enqueue(new byte[0]);
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
Assert.assertEquals(7, flowFiles.size());
Assert.assertTrue(new File(INPUT_DIRECTORY, ".dotfile").exists());
}
@Test
public void testOnScheduledShouldRunCleanly() {
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
runner.enqueue(new byte[0]);
runner.setValidateExpressionUsage(false);
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
Assert.assertEquals(7, flowFiles.size());
}
@Test
public void testDotFileFilterInclude() throws IOException {
FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new File(INPUT_DIRECTORY));
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
runner.setProperty(MoveHDFS.IGNORE_DOTTED_FILES, "false");
runner.enqueue(new byte[0]);
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
Assert.assertEquals(8, flowFiles.size());
}
@Test
public void testDotFileFilter() throws IOException {
createDotFile();
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
runner.setProperty(MoveHDFS.IGNORE_DOTTED_FILES, "false");
runner.enqueue(new byte[0]);
runner.setValidateExpressionUsage(false);
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
Assert.assertEquals(8, flowFiles.size());
}
@Test
public void testFileFilterRegex() throws IOException {
FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new File(INPUT_DIRECTORY));
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
runner.setProperty(MoveHDFS.FILE_FILTER_REGEX, ".*\\.gz");
runner.enqueue(new byte[0]);
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
Assert.assertEquals(1, flowFiles.size());
}
@Test
public void testFileFilterRegex() {
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
runner.setProperty(MoveHDFS.FILE_FILTER_REGEX, ".*\\.gz");
runner.enqueue(new byte[0]);
runner.setValidateExpressionUsage(false);
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
Assert.assertEquals(1, flowFiles.size());
}
@Test
public void testSingleFileAsInputCopy() throws IOException {
FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new File(INPUT_DIRECTORY));
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY + "/randombytes-1");
runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
runner.setProperty(MoveHDFS.OPERATION, "copy");
runner.enqueue(new byte[0]);
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
Assert.assertEquals(1, flowFiles.size());
Assert.assertTrue(new File(INPUT_DIRECTORY, "randombytes-1").exists());
Assert.assertTrue(new File(OUTPUT_DIRECTORY, "randombytes-1").exists());
}
@Test
public void testSingleFileAsInput() {
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY + "/randombytes-1");
runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
runner.enqueue(new byte[0]);
runner.setValidateExpressionUsage(false);
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
Assert.assertEquals(1, flowFiles.size());
}
@Test
public void testSingleFileAsInputMove() throws IOException {
FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new File(INPUT_DIRECTORY));
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY + "/randombytes-1");
runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
runner.enqueue(new byte[0]);
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
Assert.assertEquals(1, flowFiles.size());
Assert.assertFalse(new File(INPUT_DIRECTORY, "randombytes-1").exists());
Assert.assertTrue(new File(OUTPUT_DIRECTORY, "randombytes-1").exists());
}
private void createDotFile() throws IOException {
File dotFile = new File(DOT_FILE_PATH);
dotFile.createNewFile();
}
@Test
public void testDirectoryWithSubDirectoryAsInputMove() throws IOException {
FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), new File(INPUT_DIRECTORY));
File subdir = new File(INPUT_DIRECTORY, "subdir");
FileUtils.copyDirectory(new File(TEST_DATA_DIRECTORY), subdir);
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
runner.enqueue(new byte[0]);
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
Assert.assertEquals(7, flowFiles.size());
Assert.assertTrue(new File(INPUT_DIRECTORY).exists());
Assert.assertTrue(subdir.exists());
}
private static class TestableMoveHDFS extends MoveHDFS {
@Test
public void testEmptyInputDirectory() throws IOException {
MoveHDFS proc = new TestableMoveHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
Files.createDirectories(Paths.get(INPUT_DIRECTORY));
runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
runner.enqueue(new byte[0]);
Assert.assertEquals(0, Files.list(Paths.get(INPUT_DIRECTORY)).count());
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(MoveHDFS.REL_SUCCESS);
runner.assertAllFlowFilesTransferred(MoveHDFS.REL_SUCCESS);
Assert.assertEquals(0, flowFiles.size());
}
private KerberosProperties testKerberosProperties;
private static class TestableMoveHDFS extends MoveHDFS {
public TestableMoveHDFS(KerberosProperties testKerberosProperties) {
this.testKerberosProperties = testKerberosProperties;
}
private KerberosProperties testKerberosProperties;
@Override
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
return testKerberosProperties;
}
public TestableMoveHDFS(KerberosProperties testKerberosProperties) {
this.testKerberosProperties = testKerberosProperties;
}
}
@Override
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
return testKerberosProperties;
}
}
}