From 4c4d62c61f7c828dbcb124090992b91d631cb22e Mon Sep 17 00:00:00 2001 From: Joe Skora Date: Thu, 29 Oct 2015 01:41:58 -0400 Subject: [PATCH] NIFI-631: Added ListFile processor. Reviewed by Tony Kurc (tkurc@apache.org) --- .../nifi/processors/standard/ListFile.java | 367 ++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../processors/standard/TestListFile.java | 628 ++++++++++++++++++ 3 files changed, 996 insertions(+) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java new file mode 100644 index 0000000000..f35d9be4da --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFile.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.nio.file.FileStore; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.BasicFileAttributeView; +import java.nio.file.attribute.BasicFileAttributes; +import java.nio.file.attribute.FileOwnerAttributeView; +import java.nio.file.attribute.PosixFileAttributeView; +import java.nio.file.attribute.PosixFilePermissions; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.FileInfo; + +@TriggerSerially +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"file", "get", "list", "ingest", "source", "filesystem"}) +@CapabilityDescription("Retrieves a listing of files from the local filesystem. For each file that is listed, " + + "creates a FlowFile that represents the file so that it can be fetched in conjunction with ListFile. This " + + "Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new " + + "Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike " + + "GetFile, this Processor does not delete any data from the local filesystem.") +@WritesAttributes({ + @WritesAttribute(attribute="filename", description="The name of the file that was read from filesystem."), + @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory " + + "on filesystem. For example, if the Directory property is set to /tmp, then files picked up from " + + "/tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to " + + "true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to " + + "\"/tmp/abc/1/2/3\"."), + @WritesAttribute(attribute="fs.owner", description="The user that owns the file in filesystem"), + @WritesAttribute(attribute="fs.group", description="The group that owns the file in filesystem"), + @WritesAttribute(attribute="fs.lastModified", description="The timestamp of when the file in filesystem was " + + "last modified, as milliseconds since midnight Jan 1, 1970 UTC"), + @WritesAttribute(attribute="fs.length", description="The number of bytes in the file in filesystem"), + @WritesAttribute(attribute="fs.permissions", description="The permissions for the file in filesystem. This " + + "is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example " + + "rw-rw-r--") +}) +@SeeAlso({GetFile.class, PutFile.class}) +public class ListFile extends AbstractListProcessor { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .name("Input Directory") + .description("The input directory from which files to pull files") + .required(true) + .addValidator(StandardValidators.createDirectoryExistsValidator(true, false)) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder() + .name("Recurse Subdirectories") + .description("Indicates whether to list files from subdirectories of the directory") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder() + .name("File Filter") + .description("Only files whose names match the given regular expression will be picked up") + .required(true) + .defaultValue("[^\\.].*") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + + public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder() + .name("Path Filter") + .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned") + .required(false) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + + + public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder() + .name("Minimum File Age") + .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 sec") + .build(); + + public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder() + .name("Maximum File Age") + .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored") + .required(false) + .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) + .build(); + + public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder() + .name("Minimum File Size") + .description("The minimum size that a file must be in order to be pulled") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("0 B") + .build(); + + public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder() + .name("Maximum File Size") + .description("The maximum size that a file can be in order to be pulled") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder() + .name("Ignore Hidden Files") + .description("Indicates whether or not hidden files should be ignored") + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + + private List properties; + private Set relationships; + private final AtomicReference fileFilterRef = new AtomicReference<>(); + + public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime"; + public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime"; + public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime"; + public static final String FILE_SIZE_ATTRIBUTE = "file.size"; + public static final String FILE_OWNER_ATTRIBUTE = "file.owner"; + public static final String FILE_GROUP_ATTRIBUTE = "file.group"; + public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions"; + public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List properties = new ArrayList<>(); + properties.add(DIRECTORY); + properties.add(RECURSE); + properties.add(FILE_FILTER); + properties.add(PATH_FILTER); + properties.add(MIN_AGE); + properties.add(MAX_AGE); + properties.add(MIN_SIZE); + properties.add(MAX_SIZE); + properties.add(IGNORE_HIDDEN_FILES); + this.properties = Collections.unmodifiableList(properties); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return relationships; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + fileFilterRef.set(createFileFilter(context)); + } + + @Override + protected Map createAttributes(final FileInfo fileInfo, final ProcessContext context) { + final Map attributes = new HashMap<>(); + + final String fullPath = fileInfo.getFullPathFileName(); + final File file = new File(fullPath); + final Path filePath = file.toPath(); + final Path directoryPath = new File(getPath(context)).toPath(); + + final Path relativePath = directoryPath.relativize(filePath.getParent()); + String relativePathString = relativePath.toString() + "/"; + if (relativePathString.isEmpty()) { + relativePathString = "./"; + } + final Path absPath = filePath.toAbsolutePath(); + final String absPathString = absPath.getParent().toString() + "/"; + + attributes.put(CoreAttributes.PATH.key(), relativePathString); + attributes.put(CoreAttributes.FILENAME.key(), fileInfo.getFileName()); + attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), absPathString); + + try { + FileStore store = Files.getFileStore(filePath); + if (store.supportsFileAttributeView("basic")) { + try { + final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US); + BasicFileAttributeView view = Files.getFileAttributeView(filePath, BasicFileAttributeView.class); + BasicFileAttributes attrs = view.readAttributes(); + attributes.put(FILE_SIZE_ATTRIBUTE, Long.toString(attrs.size())); + attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis()))); + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis()))); + attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis()))); + } catch (Exception ignore) { + } // allow other attributes if these fail + } + if (store.supportsFileAttributeView("owner")) { + try { + FileOwnerAttributeView view = Files.getFileAttributeView(filePath, FileOwnerAttributeView.class); + attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName()); + } catch (Exception ignore) { + } // allow other attributes if these fail + } + if (store.supportsFileAttributeView("posix")) { + try { + PosixFileAttributeView view = Files.getFileAttributeView(filePath, PosixFileAttributeView.class); + attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions())); + attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName()); + } catch (Exception ignore) { + } // allow other attributes if these fail + } + } catch (IOException ioe) { + // well then this FlowFile gets none of these attributes + getLogger().warn("Error collecting attributes for file {}, message is {}", + new Object[]{absPathString, ioe.getMessage()}); + } + + return attributes; + } + + @Override + protected String getPath(final ProcessContext context) { + return context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue(); + } + + @Override + protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException { + final File path = new File(getPath(context)); + final Boolean recurse = context.getProperty(RECURSE).asBoolean(); + return scanDirectory(path, fileFilterRef.get(), recurse, minTimestamp); + } + + @Override + protected boolean isListingResetNecessary(final PropertyDescriptor property) { + return DIRECTORY.equals(property) + || RECURSE.equals(property) + || FILE_FILTER.equals(property) + || PATH_FILTER.equals(property) + || MIN_AGE.equals(property) + || MAX_AGE.equals(property) + || MIN_SIZE.equals(property) + || MAX_SIZE.equals(property) + || IGNORE_HIDDEN_FILES.equals(property); + } + + private List scanDirectory(final File path, final FileFilter filter, final Boolean recurse, + final Long minTimestamp) throws IOException { + final List listing = new ArrayList<>(); + File[] files = path.listFiles(); + if (files != null) { + for (File file : files) { + if (file.isDirectory()) { + if (recurse) { + listing.addAll(scanDirectory(file, filter, true, minTimestamp)); + } + } else { + if ((minTimestamp == null || file.lastModified() >= minTimestamp) && filter.accept(file)) { + listing.add(new FileInfo.Builder() + .directory(file.isDirectory()) + .filename(file.getName()) + .fullPathFileName(file.getAbsolutePath()) + .lastModifiedTime(file.lastModified()) + .build()); + } + } + } + } + + return listing; + } + + private FileFilter createFileFilter(final ProcessContext context) { + final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue(); + final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B); + final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean(); + final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue()); + final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue(); + final boolean recurseDirs = context.getProperty(RECURSE).asBoolean(); + final String pathPatternStr = context.getProperty(PATH_FILTER).getValue(); + final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr); + + return new FileFilter() { + @Override + public boolean accept(final File file) { + if (minSize > file.length()) { + return false; + } + if (maxSize != null && maxSize < file.length()) { + return false; + } + final long fileAge = System.currentTimeMillis() - file.lastModified(); + if (minAge > fileAge) { + return false; + } + if (maxAge != null && maxAge < fileAge) { + return false; + } + if (ignoreHidden && file.isHidden()) { + return false; + } + if (pathPattern != null) { + Path reldir = Paths.get(indir).relativize(file.toPath()).getParent(); + if (reldir != null && !reldir.toString().isEmpty()) { + if (!pathPattern.matcher(reldir.toString()).matches()) { + return false; + } + } + } + //Verify that we have at least read permissions on the file we're considering grabbing + if (!Files.isReadable(file.toPath())) { + return false; + } + return filePattern.matcher(file.getName()).matches(); + } + }; + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 08a1f96016..eb104312ef 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -47,6 +47,7 @@ org.apache.nifi.processors.standard.ListenHTTP org.apache.nifi.processors.standard.ListenSyslog org.apache.nifi.processors.standard.ListenUDP org.apache.nifi.processors.standard.ListSFTP +org.apache.nifi.processors.standard.ListFile org.apache.nifi.processors.standard.LogAttribute org.apache.nifi.processors.standard.MergeContent org.apache.nifi.processors.standard.ModifyBytes diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java new file mode 100644 index 0000000000..48232442a1 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListFile.java @@ -0,0 +1,628 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.FileStore; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.PosixFilePermission; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.List; +import java.util.Locale; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestListFile { + + final String TESTDIR = "target/test/data/in"; + final File testDir = new File(TESTDIR); + ListFile processor; + TestRunner runner; + ProcessContext context; + + // Testing factors in milliseconds for file ages that are configured on each run by resetAges() + // age#millis are relative time references + // time#millis are absolute time references + // age#filter are filter label strings for the filter properties + Long syncTime = System.currentTimeMillis(); + Long time0millis, time1millis, time2millis, time3millis, time4millis, time5millis; + Long age0millis, age1millis, age2millis, age3millis, age4millis, age5millis; + String age0, age1, age2, age3, age4, age5; + + @Before + public void setUp() throws Exception { + processor = new ListFile(); + runner = TestRunners.newTestRunner(processor); + context = runner.getProcessContext(); + deleteDirectory(testDir); + assertTrue("Unable to create test data directory " + testDir.getAbsolutePath(), testDir.exists() || testDir.mkdirs()); + resetAges(); + } + + @After + public void tearDown() throws Exception { + deleteDirectory(testDir); + File tempFile = processor.getPersistenceFile(); + if (tempFile.exists()) { + File[] stateFiles = tempFile.getParentFile().listFiles(); + if (stateFiles != null) { + for (File stateFile : stateFiles) { + assertTrue(stateFile.delete()); + } + } + } + } + + @Test + public void testGetSupportedPropertyDescriptors() throws Exception { + List properties = processor.getSupportedPropertyDescriptors(); + assertEquals(9, properties.size()); + assertEquals(ListFile.DIRECTORY, properties.get(0)); + assertEquals(ListFile.RECURSE, properties.get(1)); + assertEquals(ListFile.FILE_FILTER, properties.get(2)); + assertEquals(ListFile.PATH_FILTER, properties.get(3)); + assertEquals(ListFile.MIN_AGE, properties.get(4)); + assertEquals(ListFile.MAX_AGE, properties.get(5)); + assertEquals(ListFile.MIN_SIZE, properties.get(6)); + assertEquals(ListFile.MAX_SIZE, properties.get(7)); + assertEquals(ListFile.IGNORE_HIDDEN_FILES, properties.get(8)); + } + + @Test + public void testGetRelationships() throws Exception { + Set relationships = processor.getRelationships(); + assertEquals(1, relationships.size()); + assertEquals(AbstractListProcessor.REL_SUCCESS, relationships.toArray()[0]); + } + + @Test + public void testGetPath() { + runner.setProperty(ListFile.DIRECTORY, "/dir/test1"); + assertEquals("/dir/test1", processor.getPath(context)); + runner.setProperty(ListFile.DIRECTORY, "${literal(\"/DIR/TEST2\"):toLower()}"); + assertEquals("/dir/test2", processor.getPath(context)); + } + + @Test + public void testPerformListing() throws Exception { + // create first file + final File file1 = new File(TESTDIR + "/listing1.txt"); + assertTrue(file1.createNewFile()); + assertTrue(file1.setLastModified(time4millis)); + + // process first file and set new timestamp + runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(1, successFiles.size()); + + // create second file + final File file2 = new File(TESTDIR + "/listing2.txt"); + assertTrue(file2.createNewFile()); + assertTrue(file2.setLastModified(time2millis)); + + // process second file after timestamp + runner.clearTransferState(); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(1, successFiles2.size()); + + // create third file + final File file3 = new File(TESTDIR + "/listing3.txt"); + assertTrue(file3.createNewFile()); + assertTrue(file3.setLastModified(time4millis)); + + // process third file before timestamp + runner.clearTransferState(); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(0, successFiles3.size()); + + // force state to reset and process all files + runner.clearTransferState(); + runner.removeProperty(ListFile.DIRECTORY); + runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(3, successFiles4.size()); + } + + @Test + public void testFilterAge() throws IOException { + final File file1 = new File(TESTDIR + "/age1.txt"); + assertTrue(file1.createNewFile()); + assertTrue(file1.setLastModified(time0millis)); + + final File file2 = new File(TESTDIR + "/age2.txt"); + assertTrue(file2.createNewFile()); + assertTrue(file2.setLastModified(time2millis)); + + final File file3 = new File(TESTDIR + "/age3.txt"); + assertTrue(file3.createNewFile()); + assertTrue(file3.setLastModified(time4millis)); + + // check all files + runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(3, successFiles1.size()); + + // exclude oldest + runner.clearTransferState(); + runner.setProperty(ListFile.MIN_AGE, age0); + runner.setProperty(ListFile.MAX_AGE, age3); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(2, successFiles2.size()); + + // exclude newest + runner.clearTransferState(); + runner.setProperty(ListFile.MIN_AGE, age1); + runner.setProperty(ListFile.MAX_AGE, age5); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(2, successFiles3.size()); + + // exclude oldest and newest + runner.clearTransferState(); + runner.setProperty(ListFile.MIN_AGE, age1); + runner.setProperty(ListFile.MAX_AGE, age3); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(1, successFiles4.size()); + } + + @Test + public void testFilterSize() throws IOException { + final byte[] bytes1000 = new byte[1000]; + final byte[] bytes5000 = new byte[5000]; + final byte[] bytes10000 = new byte[10000]; + FileOutputStream fos; + + final File file1 = new File(TESTDIR + "/size1.txt"); + assertTrue(file1.createNewFile()); + fos = new FileOutputStream(file1); + fos.write(bytes10000); + fos.close(); + + final File file2 = new File(TESTDIR + "/size2.txt"); + assertTrue(file2.createNewFile()); + fos = new FileOutputStream(file2); + fos.write(bytes5000); + fos.close(); + + final File file3 = new File(TESTDIR + "/size3.txt"); + assertTrue(file3.createNewFile()); + fos = new FileOutputStream(file3); + fos.write(bytes1000); + fos.close(); + + // check all files + runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(3, successFiles1.size()); + + // exclude largest + runner.clearTransferState(); + runner.removeProperty(ListFile.MIN_AGE); + runner.removeProperty(ListFile.MAX_AGE); + runner.setProperty(ListFile.MIN_SIZE, "0 b"); + runner.setProperty(ListFile.MAX_SIZE, "7500 b"); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(2, successFiles2.size()); + + // exclude smallest + runner.clearTransferState(); + runner.removeProperty(ListFile.MIN_AGE); + runner.removeProperty(ListFile.MAX_AGE); + runner.setProperty(ListFile.MIN_SIZE, "2500 b"); + runner.removeProperty(ListFile.MAX_SIZE); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(2, successFiles3.size()); + + // exclude oldest and newest + runner.clearTransferState(); + runner.removeProperty(ListFile.MIN_AGE); + runner.removeProperty(ListFile.MAX_AGE); + runner.setProperty(ListFile.MIN_SIZE, "2500 b"); + runner.setProperty(ListFile.MAX_SIZE, "7500 b"); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles4 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(1, successFiles4.size()); + } + + @Test + public void testFilterHidden() throws IOException { + FileOutputStream fos; + + final File file1 = new File(TESTDIR + "/hidden1.txt"); + assertTrue(file1.createNewFile()); + fos = new FileOutputStream(file1); + fos.close(); + + final File file2 = new File(TESTDIR + "/.hidden2.txt"); + assertTrue(file2.createNewFile()); + fos = new FileOutputStream(file2); + fos.close(); + FileStore store = Files.getFileStore(file2.toPath()); + if (store.supportsFileAttributeView("dos")) { + Files.setAttribute(file2.toPath(), "dos:hidden", true); + } + + // check all files + runner.clearTransferState(); + runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); + runner.setProperty(ListFile.FILE_FILTER, ".*"); + runner.removeProperty(ListFile.MIN_AGE); + runner.removeProperty(ListFile.MAX_AGE); + runner.removeProperty(ListFile.MIN_SIZE); + runner.removeProperty(ListFile.MAX_SIZE); + runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "false"); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(2, successFiles1.size()); + + // exclude hidden + runner.clearTransferState(); + runner.setProperty(ListFile.IGNORE_HIDDEN_FILES, "true"); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(1, successFiles2.size()); + } + + @Test + public void testFilterFilePattern() throws IOException { + final File file1 = new File(TESTDIR + "/file1-abc-apple.txt"); + assertTrue(file1.createNewFile()); + + final File file2 = new File(TESTDIR + "/file2-xyz-apple.txt"); + assertTrue(file2.createNewFile()); + + final File file3 = new File(TESTDIR + "/file3-xyz-banana.txt"); + assertTrue(file3.createNewFile()); + + final File file4 = new File(TESTDIR + "/file4-pdq-banana.txt"); + assertTrue(file4.createNewFile()); + + // check all files + runner.clearTransferState(); + runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); + runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue()); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(4, successFiles1.size()); + + // filter file on pattern + runner.clearTransferState(); + runner.setProperty(ListFile.FILE_FILTER, ".*-xyz-.*"); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(2, successFiles2.size()); + } + + @Test + public void testFilterPathPattern() throws IOException { + final File subdir1 = new File(TESTDIR + "/subdir1"); + assertTrue(subdir1.mkdirs()); + + final File subdir2 = new File(TESTDIR + "/subdir1/subdir2"); + assertTrue(subdir2.mkdirs()); + + final File file1 = new File(TESTDIR + "/file1.txt"); + assertTrue(file1.createNewFile()); + + final File file2 = new File(TESTDIR + "/subdir1/file2.txt"); + assertTrue(file2.createNewFile()); + + final File file3 = new File(TESTDIR + "/subdir1/subdir2/file3.txt"); + assertTrue(file3.createNewFile()); + + final File file4 = new File(TESTDIR + "/subdir1/file4.txt"); + assertTrue(file4.createNewFile()); + + // check all files + runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); + runner.setProperty(ListFile.FILE_FILTER, ListFile.FILE_FILTER.getDefaultValue()); + runner.setProperty(ListFile.RECURSE, "true"); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(4, successFiles1.size()); + + // filter path on pattern subdir1 + runner.clearTransferState(); + runner.setProperty(ListFile.PATH_FILTER, "subdir1"); + runner.setProperty(ListFile.RECURSE, "true"); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(3, successFiles2.size()); + + // filter path on pattern subdir2 + runner.clearTransferState(); + runner.setProperty(ListFile.PATH_FILTER, "subdir2"); + runner.setProperty(ListFile.RECURSE, "true"); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles3 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(1, successFiles3.size()); + } + + @Test + public void testRecurse() throws IOException { + final File subdir1 = new File(TESTDIR + "/subdir1"); + assertTrue(subdir1.mkdirs()); + + final File subdir2 = new File(TESTDIR + "/subdir1/subdir2"); + assertTrue(subdir2.mkdirs()); + + final File file1 = new File(TESTDIR + "/file1.txt"); + assertTrue(file1.createNewFile()); + + final File file2 = new File(TESTDIR + "/subdir1/file2.txt"); + assertTrue(file2.createNewFile()); + + final File file3 = new File(TESTDIR + "/subdir1/subdir2/file3.txt"); + assertTrue(file3.createNewFile()); + + // check all files + runner.clearTransferState(); + runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); + runner.setProperty(ListFile.RECURSE, "true"); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(3, successFiles1.size()); + + // exclude hidden + runner.clearTransferState(); + runner.setProperty(ListFile.RECURSE, "false"); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(1, successFiles2.size()); + } + + @Test + public void testReadable() throws IOException { + final File file1 = new File(TESTDIR + "/file1.txt"); + assertTrue(file1.createNewFile()); + + final File file2 = new File(TESTDIR + "/file2.txt"); + assertTrue(file2.createNewFile()); + + final File file3 = new File(TESTDIR + "/file3.txt"); + assertTrue(file3.createNewFile()); + + // check all files + runner.clearTransferState(); + runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); + runner.setProperty(ListFile.RECURSE, "true"); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(3, successFiles1.size()); + + // make file2 unreadable and test (setReadable() does not work on Windows) + if (!System.getProperty("os.name").toLowerCase().startsWith("windows")) { + assertTrue(file2.setReadable(false)); + runner.clearTransferState(); + runner.setProperty(ListFile.RECURSE, "false"); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles2 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(2, successFiles2.size()); + } + } + + @Test + public void testAttributesSet() throws IOException { + // create temp file and time constant + final File file1 = new File(TESTDIR + "/file1.txt"); + assertTrue(file1.createNewFile()); + FileOutputStream fos = new FileOutputStream(file1); + fos.write(new byte[1234]); + fos.close(); + assertTrue(file1.setLastModified(time3millis)); + Long time3rounded = time3millis - time3millis % 1000; + String userName = System.getProperty("user.name"); + + // validate the file transferred + runner.clearTransferState(); + runner.setProperty(ListFile.DIRECTORY, testDir.getAbsolutePath()); + runner.run(); + runner.assertAllFlowFilesTransferred(ListFile.REL_SUCCESS); + final List successFiles1 = runner.getFlowFilesForRelationship(ListFile.REL_SUCCESS); + assertEquals(1, successFiles1.size()); + + // get attribute check values + final Path file1Path = file1.toPath(); + final Path directoryPath = new File(TESTDIR).toPath(); + final Path relativePath = directoryPath.relativize(file1.toPath().getParent()); + String relativePathString = relativePath.toString() + "/"; + final Path absolutePath = file1.toPath().toAbsolutePath(); + final String absolutePathString = absolutePath.getParent().toString() + "/"; + final FileStore store = Files.getFileStore(file1Path); + final DateFormat formatter = new SimpleDateFormat(ListFile.FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US); + final String time3Formatted = formatter.format(time3rounded); + + // check standard attributes + MockFlowFile mock1 = successFiles1.get(0); + assertEquals(relativePathString, mock1.getAttribute(CoreAttributes.PATH.key())); + assertEquals("file1.txt", mock1.getAttribute(CoreAttributes.FILENAME.key())); + assertEquals(absolutePathString, mock1.getAttribute(CoreAttributes.ABSOLUTE_PATH.key())); + assertEquals("1234", mock1.getAttribute(ListFile.FILE_SIZE_ATTRIBUTE)); + + // check attributes dependent on views supported + if (store.supportsFileAttributeView("basic")) { + assertEquals(time3Formatted, mock1.getAttribute(ListFile.FILE_LAST_MODIFY_TIME_ATTRIBUTE)); + assertNotNull(mock1.getAttribute(ListFile.FILE_CREATION_TIME_ATTRIBUTE)); + assertNotNull(mock1.getAttribute(ListFile.FILE_LAST_ACCESS_TIME_ATTRIBUTE)); + } + if (store.supportsFileAttributeView("owner")) { + // look for username containment to handle Windows domains as well as Unix user names + // org.junit.ComparisonFailure: expected:<[]username> but was:<[DOMAIN\]username> + assertTrue(mock1.getAttribute(ListFile.FILE_OWNER_ATTRIBUTE).contains(userName)); + } + if (store.supportsFileAttributeView("posix")) { + assertEquals(userName, mock1.getAttribute(ListFile.FILE_GROUP_ATTRIBUTE)); + assertEquals("rw-rw-r--", mock1.getAttribute(ListFile.FILE_PERMISSIONS_ATTRIBUTE)); + } + } + + @Test + public void testIsListingResetNecessary() throws Exception { + assertEquals(true, processor.isListingResetNecessary(ListFile.DIRECTORY)); + assertEquals(true, processor.isListingResetNecessary(ListFile.RECURSE)); + assertEquals(true, processor.isListingResetNecessary(ListFile.FILE_FILTER)); + assertEquals(true, processor.isListingResetNecessary(ListFile.PATH_FILTER)); + assertEquals(true, processor.isListingResetNecessary(ListFile.MIN_AGE)); + assertEquals(true, processor.isListingResetNecessary(ListFile.MAX_AGE)); + assertEquals(true, processor.isListingResetNecessary(ListFile.MIN_SIZE)); + assertEquals(true, processor.isListingResetNecessary(ListFile.MAX_SIZE)); + assertEquals(true, processor.isListingResetNecessary(ListFile.IGNORE_HIDDEN_FILES)); + assertEquals(false, processor.isListingResetNecessary(new PropertyDescriptor.Builder().name("x").build())); + } + + public void resetAges() { + syncTime = System.currentTimeMillis(); + + age0millis = 0L; + age1millis = 2000L; + age2millis = 5000L; + age3millis = 7000L; + age4millis = 10000L; + age5millis = 100000L; + + time0millis = syncTime - age0millis; + time1millis = syncTime - age1millis; + time2millis = syncTime - age2millis; + time3millis = syncTime - age3millis; + time4millis = syncTime - age4millis; + time5millis = syncTime - age5millis; + + age0 = Long.toString(age0millis) + " millis"; + age1 = Long.toString(age1millis) + " millis"; + age2 = Long.toString(age2millis) + " millis"; + age3 = Long.toString(age3millis) + " millis"; + age4 = Long.toString(age4millis) + " millis"; + age5 = Long.toString(age5millis) + " millis"; + } + + private void deleteDirectory(final File directory) throws IOException { + if (directory.exists()) { + File[] files = directory.listFiles(); + if (files != null) { + for (final File file : files) { + if (file.isDirectory()) { + deleteDirectory(file); + } + assertTrue("Could not delete " + file.getAbsolutePath(), file.delete()); + } + } + } + } + + private String perms2string(final String permissions) { + final StringBuilder sb = new StringBuilder(); + if (permissions.contains(PosixFilePermission.OWNER_READ.toString())) { + sb.append("r"); + } else { + sb.append("-"); + } + if (permissions.contains(PosixFilePermission.OWNER_WRITE.toString())) { + sb.append("w"); + } else { + sb.append("-"); + } + if (permissions.contains(PosixFilePermission.OWNER_EXECUTE.toString())) { + sb.append("x"); + } else { + sb.append("-"); + } + if (permissions.contains(PosixFilePermission.GROUP_READ.toString())) { + sb.append("r"); + } else { + sb.append("-"); + } + if (permissions.contains(PosixFilePermission.GROUP_WRITE.toString())) { + sb.append("w"); + } else { + sb.append("-"); + } + if (permissions.contains(PosixFilePermission.GROUP_EXECUTE.toString())) { + sb.append("x"); + } else { + sb.append("-"); + } + if (permissions.contains(PosixFilePermission.OTHERS_READ.toString())) { + sb.append("r"); + } else { + sb.append("-"); + } + if (permissions.contains(PosixFilePermission.OTHERS_WRITE.toString())) { + sb.append("w"); + } else { + sb.append("-"); + } + if (permissions.contains(PosixFilePermission.OTHERS_EXECUTE.toString())) { + sb.append("x"); + } else { + sb.append("-"); + } + return sb.toString(); + } +}