mirror of https://github.com/apache/nifi.git
NIFI-631: Added ListFile processor.
Reviewed by Tony Kurc (tkurc@apache.org)
This commit is contained in:
parent
5061e5fa0a
commit
4c4d62c61f
|
@ -0,0 +1,367 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileFilter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.FileStore;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.nio.file.attribute.BasicFileAttributeView;
|
||||||
|
import java.nio.file.attribute.BasicFileAttributes;
|
||||||
|
import java.nio.file.attribute.FileOwnerAttributeView;
|
||||||
|
import java.nio.file.attribute.PosixFileAttributeView;
|
||||||
|
import java.nio.file.attribute.PosixFilePermissions;
|
||||||
|
import java.text.DateFormat;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.processor.DataUnit;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.processors.standard.util.FileInfo;
|
||||||
|
|
||||||
|
@TriggerSerially
|
||||||
|
@InputRequirement(Requirement.INPUT_FORBIDDEN)
|
||||||
|
@Tags({"file", "get", "list", "ingest", "source", "filesystem"})
|
||||||
|
@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " +
|
||||||
|
"creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " +
|
||||||
|
"Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " +
|
||||||
|
"Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " +
|
||||||
|
"GetFile, this Processor does not delete any data from the local filesystem.")
|
||||||
|
@WritesAttributes({
|
||||||
|
@WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."),
|
||||||
|
@WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " +
|
||||||
|
"on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " +
|
||||||
|
"/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " +
|
||||||
|
"true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " +
|
||||||
|
"\"/tmp/abc/1/2/3\"."),
|
||||||
|
@WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"),
|
||||||
|
@WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"),
|
||||||
|
@WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " +
|
||||||
|
"last modified, as milliseconds since midnight Jan 1, 1970 UTC"),
|
||||||
|
@WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"),
|
||||||
|
@WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " +
|
||||||
|
"is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " +
|
||||||
|
"rw-rw-r--")
|
||||||
|
})
|
||||||
|
@SeeAlso({GetFile.class, PutFile.class})
|
||||||
|
public class ListFile extends AbstractListProcessor<FileInfo> {
|
||||||
|
|
||||||
|
public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
|
||||||
|
.name("Input Directory")
|
||||||
|
.description("The input directory from which files to pull files")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Recurse Subdirectories")
|
||||||
|
.description("Indicates whether to list files from subdirectories of the directory")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.defaultValue("true")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder()
|
||||||
|
.name("File Filter")
|
||||||
|
.description("Only files whose names match the given regular expression will be picked up")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("[^\\.].*")
|
||||||
|
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder()
|
||||||
|
.name("Path Filter")
|
||||||
|
.description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Minimum File Age")
|
||||||
|
.description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||||
|
.defaultValue("0 sec")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Maximum File Age")
|
||||||
|
.description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Minimum File Size")
|
||||||
|
.description("The minimum size that a file must be in order to be pulled")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||||
|
.defaultValue("0 B")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Maximum File Size")
|
||||||
|
.description("The maximum size that a file can be in order to be pulled")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder()
|
||||||
|
.name("Ignore Hidden Files")
|
||||||
|
.description("Indicates whether or not hidden files should be ignored")
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.defaultValue("true")
|
||||||
|
.required(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
private List<PropertyDescriptor> properties;
|
||||||
|
private Set<Relationship> relationships;
|
||||||
|
private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>();
|
||||||
|
|
||||||
|
public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime";
|
||||||
|
public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime";
|
||||||
|
public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime";
|
||||||
|
public static final String FILE_SIZE_ATTRIBUTE = "file.size";
|
||||||
|
public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
|
||||||
|
public static final String FILE_GROUP_ATTRIBUTE = "file.group";
|
||||||
|
public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
|
||||||
|
public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void init(final ProcessorInitializationContext context) {
|
||||||
|
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||||
|
properties.add(DIRECTORY);
|
||||||
|
properties.add(RECURSE);
|
||||||
|
properties.add(FILE_FILTER);
|
||||||
|
properties.add(PATH_FILTER);
|
||||||
|
properties.add(MIN_AGE);
|
||||||
|
properties.add(MAX_AGE);
|
||||||
|
properties.add(MIN_SIZE);
|
||||||
|
properties.add(MAX_SIZE);
|
||||||
|
properties.add(IGNORE_HIDDEN_FILES);
|
||||||
|
this.properties = Collections.unmodifiableList(properties);
|
||||||
|
|
||||||
|
final Set<Relationship> relationships = new HashSet<>();
|
||||||
|
relationships.add(REL_SUCCESS);
|
||||||
|
this.relationships = Collections.unmodifiableSet(relationships);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
return relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnScheduled
|
||||||
|
public void onScheduled(final ProcessContext context) {
|
||||||
|
fileFilterRef.set(createFileFilter(context));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Map<String, String> createAttributes(final FileInfo fileInfo, final ProcessContext context) {
|
||||||
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
|
|
||||||
|
final String fullPath = fileInfo.getFullPathFileName();
|
||||||
|
final File file = new File(fullPath);
|
||||||
|
final Path filePath = file.toPath();
|
||||||
|
final Path directoryPath = new File(getPath(context)).toPath();
|
||||||
|
|
||||||
|
final Path relativePath = directoryPath.relativize(filePath.getParent());
|
||||||
|
String relativePathString = relativePath.toString() + "/";
|
||||||
|
if (relativePathString.isEmpty()) {
|
||||||
|
relativePathString = "./";
|
||||||
|
}
|
||||||
|
final Path absPath = filePath.toAbsolutePath();
|
||||||
|
final String absPathString = absPath.getParent().toString() + "/";
|
||||||
|
|
||||||
|
attributes.put(CoreAttributes.PATH.key(), relativePathString);
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName());
|
||||||
|
attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
|
||||||
|
|
||||||
|
try {
|
||||||
|
FileStore store = Files.getFileStore(filePath);
|
||||||
|
if (store.supportsFileAttributeView("basic")) {
|
||||||
|
try {
|
||||||
|
final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
|
||||||
|
BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class);
|
||||||
|
BasicFileAttributes attrs = view.readAttributes();
|
||||||
|
attributes.put(FILE_SIZE_ATTRIBUTE, Long.toString(attrs.size()));
|
||||||
|
attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis())));
|
||||||
|
attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis())));
|
||||||
|
attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis())));
|
||||||
|
} catch (Exception ignore) {
|
||||||
|
} // allow other attributes if these fail
|
||||||
|
}
|
||||||
|
if (store.supportsFileAttributeView("owner")) {
|
||||||
|
try {
|
||||||
|
FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class);
|
||||||
|
attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName());
|
||||||
|
} catch (Exception ignore) {
|
||||||
|
} // allow other attributes if these fail
|
||||||
|
}
|
||||||
|
if (store.supportsFileAttributeView("posix")) {
|
||||||
|
try {
|
||||||
|
PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class);
|
||||||
|
attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions()));
|
||||||
|
attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName());
|
||||||
|
} catch (Exception ignore) {
|
||||||
|
} // allow other attributes if these fail
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// well then this FlowFile gets none of these attributes
|
||||||
|
getLogger().warn("Error collecting attributes for file {}, message is {}",
|
||||||
|
new Object[]{absPathString, ioe.getMessage()});
|
||||||
|
}
|
||||||
|
|
||||||
|
return attributes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getPath(final ProcessContext context) {
|
||||||
|
return context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<FileInfo> performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
|
||||||
|
final File path = new File(getPath(context));
|
||||||
|
final Boolean recurse = context.getProperty(RECURSE).asBoolean();
|
||||||
|
return scanDirectory(path, fileFilterRef.get(), recurse, minTimestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean isListingResetNecessary(final PropertyDescriptor property) {
|
||||||
|
return DIRECTORY.equals(property)
|
||||||
|
|| RECURSE.equals(property)
|
||||||
|
|| FILE_FILTER.equals(property)
|
||||||
|
|| PATH_FILTER.equals(property)
|
||||||
|
|| MIN_AGE.equals(property)
|
||||||
|
|| MAX_AGE.equals(property)
|
||||||
|
|| MIN_SIZE.equals(property)
|
||||||
|
|| MAX_SIZE.equals(property)
|
||||||
|
|| IGNORE_HIDDEN_FILES.equals(property);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<FileInfo> scanDirectory(final File path, final FileFilter filter, final Boolean recurse,
|
||||||
|
final Long minTimestamp) throws IOException {
|
||||||
|
final List<FileInfo> listing = new ArrayList<>();
|
||||||
|
File[] files = path.listFiles();
|
||||||
|
if (files != null) {
|
||||||
|
for (File file : files) {
|
||||||
|
if (file.isDirectory()) {
|
||||||
|
if (recurse) {
|
||||||
|
listing.addAll(scanDirectory(file, filter, true, minTimestamp));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if ((minTimestamp == null || file.lastModified() >= minTimestamp) && filter.accept(file)) {
|
||||||
|
listing.add(new FileInfo.Builder()
|
||||||
|
.directory(file.isDirectory())
|
||||||
|
.filename(file.getName())
|
||||||
|
.fullPathFileName(file.getAbsolutePath())
|
||||||
|
.lastModifiedTime(file.lastModified())
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return listing;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FileFilter createFileFilter(final ProcessContext context) {
|
||||||
|
final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
|
||||||
|
final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
|
||||||
|
final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
|
||||||
|
final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
|
||||||
|
final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean();
|
||||||
|
final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
|
||||||
|
final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue();
|
||||||
|
final boolean recurseDirs = context.getProperty(RECURSE).asBoolean();
|
||||||
|
final String pathPatternStr = context.getProperty(PATH_FILTER).getValue();
|
||||||
|
final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr);
|
||||||
|
|
||||||
|
return new FileFilter() {
|
||||||
|
@Override
|
||||||
|
public boolean accept(final File file) {
|
||||||
|
if (minSize > file.length()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (maxSize != null && maxSize < file.length()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
final long fileAge = System.currentTimeMillis() - file.lastModified();
|
||||||
|
if (minAge > fileAge) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (maxAge != null && maxAge < fileAge) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (ignoreHidden && file.isHidden()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (pathPattern != null) {
|
||||||
|
Path reldir = Paths.get(indir).relativize(file.toPath()).getParent();
|
||||||
|
if (reldir != null && !reldir.toString().isEmpty()) {
|
||||||
|
if (!pathPattern.matcher(reldir.toString()).matches()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//Verify that we have at least read permissions on the file we're considering grabbing
|
||||||
|
if (!Files.isReadable(file.toPath())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return filePattern.matcher(file.getName()).matches();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -47,6 +47,7 @@ org.apache.nifi.processors.standard.ListenHTTP
|
||||||
org.apache.nifi.processors.standard.ListenSyslog
|
org.apache.nifi.processors.standard.ListenSyslog
|
||||||
org.apache.nifi.processors.standard.ListenUDP
|
org.apache.nifi.processors.standard.ListenUDP
|
||||||
org.apache.nifi.processors.standard.ListSFTP
|
org.apache.nifi.processors.standard.ListSFTP
|
||||||
|
org.apache.nifi.processors.standard.ListFile
|
||||||
org.apache.nifi.processors.standard.LogAttribute
|
org.apache.nifi.processors.standard.LogAttribute
|
||||||
org.apache.nifi.processors.standard.MergeContent
|
org.apache.nifi.processors.standard.MergeContent
|
||||||
org.apache.nifi.processors.standard.ModifyBytes
|
org.apache.nifi.processors.standard.ModifyBytes
|
||||||
|
|
|
@ -0,0 +1,628 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.FileStore;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.attribute.PosixFilePermission;
|
||||||
|
import java.text.DateFormat;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class TestListFile {
|
||||||
|
|
||||||
|
final String TESTDIR = "target/test/data/in";
|
||||||
|
final File testDir = new File(TESTDIR);
|
||||||
|
ListFile processor;
|
||||||
|
TestRunner runner;
|
||||||
|
ProcessContext context;
|
||||||
|
|
||||||
|
// Testing factors in milliseconds for file ages that are configured on each run by resetAges()
|
||||||
|
// age#millis are relative time references
|
||||||
|
// time#millis are absolute time references
|
||||||
|
// age#filter are filter label strings for the filter properties
|
||||||
|
Long syncTime = System.currentTimeMillis();
|
||||||
|
Long time0millis, time1millis, time2millis, time3millis, time4millis, time5millis;
|
||||||
|
Long age0millis, age1millis, age2millis, age3millis, age4millis, age5millis;
|
||||||
|
String age0, age1, age2, age3, age4, age5;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
processor = new ListFile();
|
||||||
|
runner = TestRunners.newTestRunner(processor);
|
||||||
|
context = runner.getProcessContext();
|
||||||
|
deleteDirectory(testDir);
|
||||||
|
assertTrue("Unable to create test data directory " + testDir.getAbsolutePath(), testDir.exists() || testDir.mkdirs());
|
||||||
|
resetAges();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
deleteDirectory(testDir);
|
||||||
|
File tempFile = processor.getPersistenceFile();
|
||||||
|
if (tempFile.exists()) {
|
||||||
|
File[] stateFiles = tempFile.getParentFile().listFiles();
|
||||||
|
if (stateFiles != null) {
|
||||||
|
for (File stateFile : stateFiles) {
|
||||||
|
assertTrue(stateFile.delete());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetSupportedPropertyDescriptors() throws Exception {
|
||||||
|
List<PropertyDescriptor> properties = processor.getSupportedPropertyDescriptors();
|
||||||
|
assertEquals(9, properties.size());
|
||||||
|
assertEquals(ListFile.DIRECTORY, properties.get(0));
|
||||||
|
assertEquals(ListFile.RECURSE, properties.get(1));
|
||||||
|
assertEquals(ListFile.FILE_FILTER, properties.get(2));
|
||||||
|
assertEquals(ListFile.PATH_FILTER, properties.get(3));
|
||||||
|
assertEquals(ListFile.MIN_AGE, properties.get(4));
|
||||||
|
assertEquals(ListFile.MAX_AGE, properties.get(5));
|
||||||
|
assertEquals(ListFile.MIN_SIZE, properties.get(6));
|
||||||
|
assertEquals(ListFile.MAX_SIZE, properties.get(7));
|
||||||
|
assertEquals(ListFile.IGNORE_HIDDEN_FILES, properties.get(8));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetRelationships() throws Exception {
|
||||||
|
Set<Relationship> relationships = processor.getRelationships();
|
||||||
|
assertEquals(1, relationships.size());
|
||||||
|
assertEquals(AbstractListProcessor.REL_SUCCESS, relationships.toArray()[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetPath() {
|
||||||
|
runner.setProperty(ListFile.DIRECTORY, "/dir/test1");
|
||||||
|
assertEquals("/dir/test1", processor.getPath(context));
|
||||||
|
runner.setProperty(ListFile.DIRECTORY, "${literal(\"/DIR/TEST2\"):toLower()}");
|
||||||
|
assertEquals("/dir/test2", processor.getPath(context));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPerformListing() throws Exception {
|
||||||
|
// create first file
|
||||||
|
final File file1 = new File(TESTDIR + "/listing1.txt");
|
||||||
|
assertTrue(file1.createNewFile());
|
||||||
|
assertTrue(file1.setLastModified(time4millis));
|
||||||
|
|
||||||
|
// process first file and set new timestamp
|
||||||
|
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(1, successFiles.size());
|
||||||
|
|
||||||
|
// create second file
|
||||||
|
final File file2 = new File(TESTDIR + "/listing2.txt");
|
||||||
|
assertTrue(file2.createNewFile());
|
||||||
|
assertTrue(file2.setLastModified(time2millis));
|
||||||
|
|
||||||
|
// process second file after timestamp
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(1, successFiles2.size());
|
||||||
|
|
||||||
|
// create third file
|
||||||
|
final File file3 = new File(TESTDIR + "/listing3.txt");
|
||||||
|
assertTrue(file3.createNewFile());
|
||||||
|
assertTrue(file3.setLastModified(time4millis));
|
||||||
|
|
||||||
|
// process third file before timestamp
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(0, successFiles3.size());
|
||||||
|
|
||||||
|
// force state to reset and process all files
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.removeProperty(ListFile.DIRECTORY);
|
||||||
|
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(3, successFiles4.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilterAge() throws IOException {
|
||||||
|
final File file1 = new File(TESTDIR + "/age1.txt");
|
||||||
|
assertTrue(file1.createNewFile());
|
||||||
|
assertTrue(file1.setLastModified(time0millis));
|
||||||
|
|
||||||
|
final File file2 = new File(TESTDIR + "/age2.txt");
|
||||||
|
assertTrue(file2.createNewFile());
|
||||||
|
assertTrue(file2.setLastModified(time2millis));
|
||||||
|
|
||||||
|
final File file3 = new File(TESTDIR + "/age3.txt");
|
||||||
|
assertTrue(file3.createNewFile());
|
||||||
|
assertTrue(file3.setLastModified(time4millis));
|
||||||
|
|
||||||
|
// check all files
|
||||||
|
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(3, successFiles1.size());
|
||||||
|
|
||||||
|
// exclude oldest
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(ListFile.MIN_AGE, age0);
|
||||||
|
runner.setProperty(ListFile.MAX_AGE, age3);
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(2, successFiles2.size());
|
||||||
|
|
||||||
|
// exclude newest
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(ListFile.MIN_AGE, age1);
|
||||||
|
runner.setProperty(ListFile.MAX_AGE, age5);
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(2, successFiles3.size());
|
||||||
|
|
||||||
|
// exclude oldest and newest
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(ListFile.MIN_AGE, age1);
|
||||||
|
runner.setProperty(ListFile.MAX_AGE, age3);
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(1, successFiles4.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilterSize() throws IOException {
|
||||||
|
final byte[] bytes1000 = new byte[1000];
|
||||||
|
final byte[] bytes5000 = new byte[5000];
|
||||||
|
final byte[] bytes10000 = new byte[10000];
|
||||||
|
FileOutputStream fos;
|
||||||
|
|
||||||
|
final File file1 = new File(TESTDIR + "/size1.txt");
|
||||||
|
assertTrue(file1.createNewFile());
|
||||||
|
fos = new FileOutputStream(file1);
|
||||||
|
fos.write(bytes10000);
|
||||||
|
fos.close();
|
||||||
|
|
||||||
|
final File file2 = new File(TESTDIR + "/size2.txt");
|
||||||
|
assertTrue(file2.createNewFile());
|
||||||
|
fos = new FileOutputStream(file2);
|
||||||
|
fos.write(bytes5000);
|
||||||
|
fos.close();
|
||||||
|
|
||||||
|
final File file3 = new File(TESTDIR + "/size3.txt");
|
||||||
|
assertTrue(file3.createNewFile());
|
||||||
|
fos = new FileOutputStream(file3);
|
||||||
|
fos.write(bytes1000);
|
||||||
|
fos.close();
|
||||||
|
|
||||||
|
// check all files
|
||||||
|
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(3, successFiles1.size());
|
||||||
|
|
||||||
|
// exclude largest
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.removeProperty(ListFile.MIN_AGE);
|
||||||
|
runner.removeProperty(ListFile.MAX_AGE);
|
||||||
|
runner.setProperty(ListFile.MIN_SIZE, "0 b");
|
||||||
|
runner.setProperty(ListFile.MAX_SIZE, "7500 b");
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(2, successFiles2.size());
|
||||||
|
|
||||||
|
// exclude smallest
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.removeProperty(ListFile.MIN_AGE);
|
||||||
|
runner.removeProperty(ListFile.MAX_AGE);
|
||||||
|
runner.setProperty(ListFile.MIN_SIZE, "2500 b");
|
||||||
|
runner.removeProperty(ListFile.MAX_SIZE);
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(2, successFiles3.size());
|
||||||
|
|
||||||
|
// exclude oldest and newest
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.removeProperty(ListFile.MIN_AGE);
|
||||||
|
runner.removeProperty(ListFile.MAX_AGE);
|
||||||
|
runner.setProperty(ListFile.MIN_SIZE, "2500 b");
|
||||||
|
runner.setProperty(ListFile.MAX_SIZE, "7500 b");
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(1, successFiles4.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilterHidden() throws IOException {
|
||||||
|
FileOutputStream fos;
|
||||||
|
|
||||||
|
final File file1 = new File(TESTDIR + "/hidden1.txt");
|
||||||
|
assertTrue(file1.createNewFile());
|
||||||
|
fos = new FileOutputStream(file1);
|
||||||
|
fos.close();
|
||||||
|
|
||||||
|
final File file2 = new File(TESTDIR + "/.hidden2.txt");
|
||||||
|
assertTrue(file2.createNewFile());
|
||||||
|
fos = new FileOutputStream(file2);
|
||||||
|
fos.close();
|
||||||
|
FileStore store = Files.getFileStore(file2.toPath());
|
||||||
|
if (store.supportsFileAttributeView("dos")) {
|
||||||
|
Files.setAttribute(file2.toPath(), "dos:hidden", true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check all files
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||||
|
runner.setProperty(ListFile.FILE_FILTER, ".*");
|
||||||
|
runner.removeProperty(ListFile.MIN_AGE);
|
||||||
|
runner.removeProperty(ListFile.MAX_AGE);
|
||||||
|
runner.removeProperty(ListFile.MIN_SIZE);
|
||||||
|
runner.removeProperty(ListFile.MAX_SIZE);
|
||||||
|
runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "false");
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(2, successFiles1.size());
|
||||||
|
|
||||||
|
// exclude hidden
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "true");
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(1, successFiles2.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilterFilePattern() throws IOException {
|
||||||
|
final File file1 = new File(TESTDIR + "/file1-abc-apple.txt");
|
||||||
|
assertTrue(file1.createNewFile());
|
||||||
|
|
||||||
|
final File file2 = new File(TESTDIR + "/file2-xyz-apple.txt");
|
||||||
|
assertTrue(file2.createNewFile());
|
||||||
|
|
||||||
|
final File file3 = new File(TESTDIR + "/file3-xyz-banana.txt");
|
||||||
|
assertTrue(file3.createNewFile());
|
||||||
|
|
||||||
|
final File file4 = new File(TESTDIR + "/file4-pdq-banana.txt");
|
||||||
|
assertTrue(file4.createNewFile());
|
||||||
|
|
||||||
|
// check all files
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||||
|
runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue());
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(4, successFiles1.size());
|
||||||
|
|
||||||
|
// filter file on pattern
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(ListFile.FILE_FILTER, ".*-xyz-.*");
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(2, successFiles2.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFilterPathPattern() throws IOException {
|
||||||
|
final File subdir1 = new File(TESTDIR + "/subdir1");
|
||||||
|
assertTrue(subdir1.mkdirs());
|
||||||
|
|
||||||
|
final File subdir2 = new File(TESTDIR + "/subdir1/subdir2");
|
||||||
|
assertTrue(subdir2.mkdirs());
|
||||||
|
|
||||||
|
final File file1 = new File(TESTDIR + "/file1.txt");
|
||||||
|
assertTrue(file1.createNewFile());
|
||||||
|
|
||||||
|
final File file2 = new File(TESTDIR + "/subdir1/file2.txt");
|
||||||
|
assertTrue(file2.createNewFile());
|
||||||
|
|
||||||
|
final File file3 = new File(TESTDIR + "/subdir1/subdir2/file3.txt");
|
||||||
|
assertTrue(file3.createNewFile());
|
||||||
|
|
||||||
|
final File file4 = new File(TESTDIR + "/subdir1/file4.txt");
|
||||||
|
assertTrue(file4.createNewFile());
|
||||||
|
|
||||||
|
// check all files
|
||||||
|
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||||
|
runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue());
|
||||||
|
runner.setProperty(ListFile.RECURSE, "true");
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(4, successFiles1.size());
|
||||||
|
|
||||||
|
// filter path on pattern subdir1
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(ListFile.PATH_FILTER, "subdir1");
|
||||||
|
runner.setProperty(ListFile.RECURSE, "true");
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(3, successFiles2.size());
|
||||||
|
|
||||||
|
// filter path on pattern subdir2
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(ListFile.PATH_FILTER, "subdir2");
|
||||||
|
runner.setProperty(ListFile.RECURSE, "true");
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(1, successFiles3.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRecurse() throws IOException {
|
||||||
|
final File subdir1 = new File(TESTDIR + "/subdir1");
|
||||||
|
assertTrue(subdir1.mkdirs());
|
||||||
|
|
||||||
|
final File subdir2 = new File(TESTDIR + "/subdir1/subdir2");
|
||||||
|
assertTrue(subdir2.mkdirs());
|
||||||
|
|
||||||
|
final File file1 = new File(TESTDIR + "/file1.txt");
|
||||||
|
assertTrue(file1.createNewFile());
|
||||||
|
|
||||||
|
final File file2 = new File(TESTDIR + "/subdir1/file2.txt");
|
||||||
|
assertTrue(file2.createNewFile());
|
||||||
|
|
||||||
|
final File file3 = new File(TESTDIR + "/subdir1/subdir2/file3.txt");
|
||||||
|
assertTrue(file3.createNewFile());
|
||||||
|
|
||||||
|
// check all files
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||||
|
runner.setProperty(ListFile.RECURSE, "true");
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(3, successFiles1.size());
|
||||||
|
|
||||||
|
// exclude hidden
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(ListFile.RECURSE, "false");
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(1, successFiles2.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadable() throws IOException {
|
||||||
|
final File file1 = new File(TESTDIR + "/file1.txt");
|
||||||
|
assertTrue(file1.createNewFile());
|
||||||
|
|
||||||
|
final File file2 = new File(TESTDIR + "/file2.txt");
|
||||||
|
assertTrue(file2.createNewFile());
|
||||||
|
|
||||||
|
final File file3 = new File(TESTDIR + "/file3.txt");
|
||||||
|
assertTrue(file3.createNewFile());
|
||||||
|
|
||||||
|
// check all files
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||||
|
runner.setProperty(ListFile.RECURSE, "true");
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(3, successFiles1.size());
|
||||||
|
|
||||||
|
// make file2 unreadable and test (setReadable() does not work on Windows)
|
||||||
|
if (!System.getProperty("os.name").toLowerCase().startsWith("windows")) {
|
||||||
|
assertTrue(file2.setReadable(false));
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(ListFile.RECURSE, "false");
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(2, successFiles2.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAttributesSet() throws IOException {
|
||||||
|
// create temp file and time constant
|
||||||
|
final File file1 = new File(TESTDIR + "/file1.txt");
|
||||||
|
assertTrue(file1.createNewFile());
|
||||||
|
FileOutputStream fos = new FileOutputStream(file1);
|
||||||
|
fos.write(new byte[1234]);
|
||||||
|
fos.close();
|
||||||
|
assertTrue(file1.setLastModified(time3millis));
|
||||||
|
Long time3rounded = time3millis - time3millis % 1000;
|
||||||
|
String userName = System.getProperty("user.name");
|
||||||
|
|
||||||
|
// validate the file transferred
|
||||||
|
runner.clearTransferState();
|
||||||
|
runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath());
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS);
|
||||||
|
final List<MockFlowFile> successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS);
|
||||||
|
assertEquals(1, successFiles1.size());
|
||||||
|
|
||||||
|
// get attribute check values
|
||||||
|
final Path file1Path = file1.toPath();
|
||||||
|
final Path directoryPath = new File(TESTDIR).toPath();
|
||||||
|
final Path relativePath = directoryPath.relativize(file1.toPath().getParent());
|
||||||
|
String relativePathString = relativePath.toString() + "/";
|
||||||
|
final Path absolutePath = file1.toPath().toAbsolutePath();
|
||||||
|
final String absolutePathString = absolutePath.getParent().toString() + "/";
|
||||||
|
final FileStore store = Files.getFileStore(file1Path);
|
||||||
|
final DateFormat formatter = new SimpleDateFormat(ListFile.FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
|
||||||
|
final String time3Formatted = formatter.format(time3rounded);
|
||||||
|
|
||||||
|
// check standard attributes
|
||||||
|
MockFlowFile mock1 = successFiles1.get(0);
|
||||||
|
assertEquals(relativePathString, mock1.getAttribute(CoreAttributes.PATH.key()));
|
||||||
|
assertEquals("file1.txt", mock1.getAttribute(CoreAttributes.FILENAME.key()));
|
||||||
|
assertEquals(absolutePathString, mock1.getAttribute(CoreAttributes.ABSOLUTE_PATH.key()));
|
||||||
|
assertEquals("1234", mock1.getAttribute(ListFile.FILE_SIZE_ATTRIBUTE));
|
||||||
|
|
||||||
|
// check attributes dependent on views supported
|
||||||
|
if (store.supportsFileAttributeView("basic")) {
|
||||||
|
assertEquals(time3Formatted, mock1.getAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE));
|
||||||
|
assertNotNull(mock1.getAttribute(ListFile.FILE_CREATION_TIME_ATTRIBUTE));
|
||||||
|
assertNotNull(mock1.getAttribute(ListFile.FILE_LAST_ACCESS_TIME_ATTRIBUTE));
|
||||||
|
}
|
||||||
|
if (store.supportsFileAttributeView("owner")) {
|
||||||
|
// look for username containment to handle Windows domains as well as Unix user names
|
||||||
|
// org.junit.ComparisonFailure: expected:<[]username> but was:<[DOMAIN\]username>
|
||||||
|
assertTrue(mock1.getAttribute(ListFile.FILE_OWNER_ATTRIBUTE).contains(userName));
|
||||||
|
}
|
||||||
|
if (store.supportsFileAttributeView("posix")) {
|
||||||
|
assertEquals(userName, mock1.getAttribute(ListFile.FILE_GROUP_ATTRIBUTE));
|
||||||
|
assertEquals("rw-rw-r--", mock1.getAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIsListingResetNecessary() throws Exception {
|
||||||
|
assertEquals(true, processor.isListingResetNecessary(ListFile.DIRECTORY));
|
||||||
|
assertEquals(true, processor.isListingResetNecessary(ListFile.RECURSE));
|
||||||
|
assertEquals(true, processor.isListingResetNecessary(ListFile.FILE_FILTER));
|
||||||
|
assertEquals(true, processor.isListingResetNecessary(ListFile.PATH_FILTER));
|
||||||
|
assertEquals(true, processor.isListingResetNecessary(ListFile.MIN_AGE));
|
||||||
|
assertEquals(true, processor.isListingResetNecessary(ListFile.MAX_AGE));
|
||||||
|
assertEquals(true, processor.isListingResetNecessary(ListFile.MIN_SIZE));
|
||||||
|
assertEquals(true, processor.isListingResetNecessary(ListFile.MAX_SIZE));
|
||||||
|
assertEquals(true, processor.isListingResetNecessary(ListFile.IGNORE_HIDDEN_FILES));
|
||||||
|
assertEquals(false, processor.isListingResetNecessary(new PropertyDescriptor.Builder().name("x").build()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void resetAges() {
|
||||||
|
syncTime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
age0millis = 0L;
|
||||||
|
age1millis = 2000L;
|
||||||
|
age2millis = 5000L;
|
||||||
|
age3millis = 7000L;
|
||||||
|
age4millis = 10000L;
|
||||||
|
age5millis = 100000L;
|
||||||
|
|
||||||
|
time0millis = syncTime - age0millis;
|
||||||
|
time1millis = syncTime - age1millis;
|
||||||
|
time2millis = syncTime - age2millis;
|
||||||
|
time3millis = syncTime - age3millis;
|
||||||
|
time4millis = syncTime - age4millis;
|
||||||
|
time5millis = syncTime - age5millis;
|
||||||
|
|
||||||
|
age0 = Long.toString(age0millis) + " millis";
|
||||||
|
age1 = Long.toString(age1millis) + " millis";
|
||||||
|
age2 = Long.toString(age2millis) + " millis";
|
||||||
|
age3 = Long.toString(age3millis) + " millis";
|
||||||
|
age4 = Long.toString(age4millis) + " millis";
|
||||||
|
age5 = Long.toString(age5millis) + " millis";
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteDirectory(final File directory) throws IOException {
|
||||||
|
if (directory.exists()) {
|
||||||
|
File[] files = directory.listFiles();
|
||||||
|
if (files != null) {
|
||||||
|
for (final File file : files) {
|
||||||
|
if (file.isDirectory()) {
|
||||||
|
deleteDirectory(file);
|
||||||
|
}
|
||||||
|
assertTrue("Could not delete " + file.getAbsolutePath(), file.delete());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String perms2string(final String permissions) {
|
||||||
|
final StringBuilder sb = new StringBuilder();
|
||||||
|
if (permissions.contains(PosixFilePermission.OWNER_READ.toString())) {
|
||||||
|
sb.append("r");
|
||||||
|
} else {
|
||||||
|
sb.append("-");
|
||||||
|
}
|
||||||
|
if (permissions.contains(PosixFilePermission.OWNER_WRITE.toString())) {
|
||||||
|
sb.append("w");
|
||||||
|
} else {
|
||||||
|
sb.append("-");
|
||||||
|
}
|
||||||
|
if (permissions.contains(PosixFilePermission.OWNER_EXECUTE.toString())) {
|
||||||
|
sb.append("x");
|
||||||
|
} else {
|
||||||
|
sb.append("-");
|
||||||
|
}
|
||||||
|
if (permissions.contains(PosixFilePermission.GROUP_READ.toString())) {
|
||||||
|
sb.append("r");
|
||||||
|
} else {
|
||||||
|
sb.append("-");
|
||||||
|
}
|
||||||
|
if (permissions.contains(PosixFilePermission.GROUP_WRITE.toString())) {
|
||||||
|
sb.append("w");
|
||||||
|
} else {
|
||||||
|
sb.append("-");
|
||||||
|
}
|
||||||
|
if (permissions.contains(PosixFilePermission.GROUP_EXECUTE.toString())) {
|
||||||
|
sb.append("x");
|
||||||
|
} else {
|
||||||
|
sb.append("-");
|
||||||
|
}
|
||||||
|
if (permissions.contains(PosixFilePermission.OTHERS_READ.toString())) {
|
||||||
|
sb.append("r");
|
||||||
|
} else {
|
||||||
|
sb.append("-");
|
||||||
|
}
|
||||||
|
if (permissions.contains(PosixFilePermission.OTHERS_WRITE.toString())) {
|
||||||
|
sb.append("w");
|
||||||
|
} else {
|
||||||
|
sb.append("-");
|
||||||
|
}
|
||||||
|
if (permissions.contains(PosixFilePermission.OTHERS_EXECUTE.toString())) {
|
||||||
|
sb.append("x");
|
||||||
|
} else {
|
||||||
|
sb.append("-");
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue