diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java index 0c95c758d0..8784c75de7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java @@ -49,6 +49,7 @@ import java.nio.file.attribute.PosixFilePermissions; import java.nio.file.attribute.UserPrincipalLookupService; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -203,8 +204,9 @@ public class PutFile extends AbstractProcessor { Path tempDotCopyFile = null; try { final Path rootDirPath = configuredRootDirPath; - final Path tempCopyFile = rootDirPath.resolve("." + flowFile.getAttribute(CoreAttributes.FILENAME.key())); - final Path copyFile = rootDirPath.resolve(flowFile.getAttribute(CoreAttributes.FILENAME.key())); + String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final Path tempCopyFile = rootDirPath.resolve("." + filename); + final Path copyFile = rootDirPath.resolve(filename); if (!Files.exists(rootDirPath)) { if (context.getProperty(CREATE_DIRS).asBoolean()) { @@ -224,7 +226,7 @@ public class PutFile extends AbstractProcessor { final Path finalCopyFileDir = finalCopyFile.getParent(); if (Files.exists(finalCopyFileDir) && maxDestinationFiles != null) { // check if too many files already - final int numFiles = finalCopyFileDir.toFile().list().length; + final long numFiles = getFilesNumberInFolder(finalCopyFileDir, filename); if (numFiles >= maxDestinationFiles) { flowFile = session.penalize(flowFile); @@ -336,6 +338,13 @@ public class PutFile extends AbstractProcessor { } } + private long getFilesNumberInFolder(Path folder, String filename) { + String[] filesInFolder = folder.toFile().list(); + return Arrays.stream(filesInFolder) + .filter(eachFilename -> !eachFilename.equals(filename)) + .count(); + } + protected String stringPermissions(String perms) { String permissions = ""; final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$"); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutFile.java new file mode 100644 index 0000000000..51ad7c0362 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutFile.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestPutFile { + + public static final String TARGET_DIRECTORY = "target/put-file"; + private File targetDir; + + @Before + public void prepDestDirectory() throws IOException { + targetDir = new File(TARGET_DIRECTORY); + if (!targetDir.exists()) { + Files.createDirectories(targetDir.toPath()); + return; + } + + targetDir.setReadable(true); + + deleteDirectoryContent(targetDir); + } + + private void deleteDirectoryContent(File directory) throws IOException { + for (final File file : directory.listFiles()) { + if (file.isDirectory()) { + deleteDirectoryContent(file); + } + Files.delete(file.toPath()); + } + } + + @Test + public void testCreateDirectory() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutFile()); + String newDir = targetDir.getAbsolutePath()+"/new-folder"; + runner.setProperty(PutFile.DIRECTORY, newDir); + runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION); + + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Hello world!!".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + Path targetPath = Paths.get(TARGET_DIRECTORY + "/new-folder/targetFile.txt"); + byte[] content = Files.readAllBytes(targetPath); + assertEquals("Hello world!!", new String(content)); + } + + @Test + public void testReplaceConflictResolution() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutFile()); + runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath()); + runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION); + + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Hello world!!".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + byte[] content = Files.readAllBytes(targetPath); + assertEquals("Hello world!!", new String(content)); + + //Second file + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Another file".getBytes(), attributes); + runner.run(); + runner.assertTransferCount(FetchFile.REL_SUCCESS, 2); + File dir = new File(TARGET_DIRECTORY); + assertEquals(1, dir.list().length); + targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + content = Files.readAllBytes(targetPath); + assertEquals("Another file", new String(content)); + } + + @Test + public void testIgnoreConflictResolution() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutFile()); + runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath()); + runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.IGNORE_RESOLUTION); + + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Hello world!!".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + byte[] content = Files.readAllBytes(targetPath); + assertEquals("Hello world!!", new String(content)); + + //Second file + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Another file".getBytes(), attributes); + runner.run(); + runner.assertTransferCount(FetchFile.REL_SUCCESS, 2); + File dir = new File(TARGET_DIRECTORY); + assertEquals(1, dir.list().length); + targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + content = Files.readAllBytes(targetPath); + assertEquals("Hello world!!", new String(content)); + } + + @Test + public void testFailConflictResolution() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutFile()); + runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath()); + runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.FAIL_RESOLUTION); + + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Hello world!!".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + byte[] content = Files.readAllBytes(targetPath); + assertEquals("Hello world!!", new String(content)); + + //Second file + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Another file".getBytes(), attributes); + runner.run(); + runner.assertTransferCount(PutFile.REL_SUCCESS, 1); + runner.assertTransferCount(PutFile.REL_FAILURE, 1); + runner.assertPenalizeCount(1); + } + + @Test + public void testMaxFileLimitReach() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutFile()); + runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath()); + runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION); + runner.setProperty(PutFile.MAX_DESTINATION_FILES, "1"); + + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Hello world!!".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + byte[] content = Files.readAllBytes(targetPath); + assertEquals("Hello world!!", new String(content)); + + //Second file + attributes.put(CoreAttributes.FILENAME.key(), "secondFile.txt"); + runner.enqueue("Hello world!!".getBytes(), attributes); + runner.run(); + runner.assertTransferCount(PutFile.REL_FAILURE, 1); + runner.assertPenalizeCount(1); + } + + @Test + public void testReplaceAndMaxFileLimitReach() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new PutFile()); + runner.setProperty(PutFile.DIRECTORY, targetDir.getAbsolutePath()); + runner.setProperty(PutFile.CONFLICT_RESOLUTION, PutFile.REPLACE_RESOLUTION); + runner.setProperty(PutFile.MAX_DESTINATION_FILES, "1"); + + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Hello world!!".getBytes(), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_SUCCESS, 1); + Path targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + byte[] content = Files.readAllBytes(targetPath); + assertEquals("Hello world!!", new String(content)); + + //Second file + attributes.put(CoreAttributes.FILENAME.key(), "targetFile.txt"); + runner.enqueue("Another file".getBytes(), attributes); + runner.run(); + runner.assertTransferCount(FetchFile.REL_SUCCESS, 2); + File dir = new File(TARGET_DIRECTORY); + assertEquals(1, dir.list().length); + targetPath = Paths.get(TARGET_DIRECTORY+"/targetFile.txt"); + content = Files.readAllBytes(targetPath); + assertEquals("Another file", new String(content)); + } + +}