From 6e8b1c8f74b957eaef04d1fa7574c29d9c3acbe0 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 18 Apr 2018 11:14:41 +0900 Subject: [PATCH] NIFI-5090: Create move target dir dynamically at FetchFTP and FetchSFTP Support creating target parent directories even if directory listing is disabled. fixed typo in doc Signed-off-by: Matthew Burgess This closes #2642 --- .../nifi/processors/standard/FetchFTP.java | 6 +- .../standard/FetchFileTransfer.java | 32 ++- .../nifi/processors/standard/FetchSFTP.java | 10 + .../processors/standard/PutFileTransfer.java | 6 +- .../standard/util/FileTransfer.java | 16 ++ .../standard/util/SFTPTransfer.java | 66 ++--- .../standard/TestFetchFileTransfer.java | 30 ++- .../standard/util/TestSFTPTransfer.java | 247 ++++++++++++++++++ 8 files changed, 362 insertions(+), 51 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java index 6f3f84d04f..488627445b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java @@ -29,11 +29,6 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.standard.util.FileTransfer; -import org.apache.nifi.processors.standard.FetchFileTransfer; -import org.apache.nifi.processors.standard.GetFTP; -import org.apache.nifi.processors.standard.GetSFTP; -import org.apache.nifi.processors.standard.PutFTP; -import org.apache.nifi.processors.standard.PutSFTP; import org.apache.nifi.processors.standard.util.FTPTransfer; // Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted. @@ -62,6 +57,7 @@ public class FetchFTP extends FetchFileTransfer { properties.add(REMOTE_FILENAME); properties.add(COMPLETION_STRATEGY); properties.add(MOVE_DESTINATION_DIR); + properties.add(MOVE_CREATE_DIRECTORY); properties.add(FTPTransfer.CONNECTION_TIMEOUT); properties.add(FTPTransfer.DATA_TIMEOUT); properties.add(FTPTransfer.USE_COMPRESSION); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java index 3ba0066a10..c44a406c48 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java @@ -37,6 +37,7 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.Tuple; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -101,11 +102,19 @@ public abstract class FetchFileTransfer extends AbstractProcessor { .defaultValue(COMPLETION_NONE.getValue()) .required(true) .build(); + static final PropertyDescriptor MOVE_CREATE_DIRECTORY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(FileTransfer.CREATE_DIRECTORY).description(String.format("Used when '%s' is '%s'. %s", + COMPLETION_STRATEGY.getDisplayName(), + COMPLETION_MOVE.getDisplayName(), + FileTransfer.CREATE_DIRECTORY.getDescription())) + .required(false) + .build(); static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder() .name("Move Destination Directory") - .description("The directory on the remote server to the move the original file to once it has been ingested into NiFi. " - + "This property is ignored unless the Completion Strategy is set to \"Move File\". The specified directory must already exist on" - + "the remote system, or the rename will fail.") + .description(String.format("The directory on the remote server to move the original file to once it has been ingested into NiFi. " + + "This property is ignored unless the %s is set to '%s'. The specified directory must already exist on " + + "the remote system if '%s' is disabled, or the rename will fail.", + COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_CREATE_DIRECTORY.getDisplayName())) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(false) @@ -189,6 +198,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor { properties.add(REMOTE_FILENAME); properties.add(COMPLETION_STRATEGY); properties.add(MOVE_DESTINATION_DIR); + properties.add(MOVE_CREATE_DIRECTORY); return properties; } @@ -308,15 +318,19 @@ public abstract class FetchFileTransfer extends AbstractProcessor { new Object[]{flowFile, host, port, filename, ioe}, ioe); } } else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) { - String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue(); - if (!targetDir.endsWith("/")) { - targetDir = targetDir + "/"; - } + final String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue(); final String simpleFilename = StringUtils.substringAfterLast(filename, "/"); - final String target = targetDir + simpleFilename; try { - transfer.rename(flowFile, filename, target); + final String absoluteTargetDirPath = transfer.getAbsolutePath(flowFile, targetDir); + final File targetFile = new File(absoluteTargetDirPath, simpleFilename); + if (context.getProperty(MOVE_CREATE_DIRECTORY).asBoolean()) { + // Create the target directory if necessary. + transfer.ensureDirectoryExists(flowFile, targetFile.getParentFile()); + } + + transfer.rename(flowFile, filename, targetFile.getAbsolutePath()); + } catch (final IOException ioe) { getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}", new Object[]{flowFile, host, port, filename, ioe}, ioe); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java index 79eb1e617a..68465579f9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java @@ -49,6 +49,14 @@ public class FetchSFTP extends FetchFileTransfer { @Override protected List getSupportedPropertyDescriptors() { final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build(); + final PropertyDescriptor disableDirectoryListing = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(SFTPTransfer.DISABLE_DIRECTORY_LISTING) + .description(String.format("Control how '%s' is created when '%s' is '%s' and '%s' is enabled. %s", + MOVE_DESTINATION_DIR.getDisplayName(), + COMPLETION_STRATEGY.getDisplayName(), + COMPLETION_MOVE.getDisplayName(), + MOVE_CREATE_DIRECTORY.getDisplayName(), + SFTPTransfer.DISABLE_DIRECTORY_LISTING.getDescription())).build(); final List properties = new ArrayList<>(); properties.add(HOSTNAME); @@ -60,6 +68,8 @@ public class FetchSFTP extends FetchFileTransfer { properties.add(REMOTE_FILENAME); properties.add(COMPLETION_STRATEGY); properties.add(MOVE_DESTINATION_DIR); + properties.add(MOVE_CREATE_DIRECTORY); + properties.add(disableDirectoryListing); properties.add(SFTPTransfer.CONNECTION_TIMEOUT); properties.add(SFTPTransfer.DATA_TIMEOUT); properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java index cbaa9ecaab..12eafdb69e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java @@ -106,11 +106,7 @@ public abstract class PutFileTransfer extends AbstractPr if (rootPath == null) { workingDirPath = null; } else { - File workingDirectory = new File(rootPath); - if (!workingDirectory.getPath().startsWith("/") && !workingDirectory.getPath().startsWith("\\")) { - workingDirectory = new File(transfer.getHomeDirectory(flowFile), workingDirectory.getPath()); - } - workingDirPath = workingDirectory.getPath().replace("\\", "/"); + workingDirPath = transfer.getAbsolutePath(flowFile, rootPath); } final boolean rejectZeroByteFiles = context.getProperty(FileTransfer.REJECT_ZERO_BYTE).asBoolean(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java index 1163ea8f7d..d500b9d6a0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FileTransfer.java @@ -58,6 +58,22 @@ public interface FileTransfer extends Closeable { void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException; + /** + * Compute an absolute file path for the given remote path. + * @param flowFile is used to setup file transfer client with its attribute values, to get user home directory + * @param remotePath the target remote path + * @return The absolute path for the given remote path + */ + default String getAbsolutePath(FlowFile flowFile, String remotePath) throws IOException { + final String absoluteRemotePath; + if (!remotePath.startsWith("/") && !remotePath.startsWith("\\")) { + absoluteRemotePath = new File(getHomeDirectory(flowFile), remotePath).getPath(); + } else { + absoluteRemotePath = remotePath; + } + return absoluteRemotePath.replace("\\", "/"); + } + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() .name("Hostname") .description("The fully qualified hostname or IP address of the remote system") diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java index d7aa6e312a..c11a53b9cf 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java @@ -106,7 +106,7 @@ public class SFTPTransfer implements FileTransfer { .description("If set to 'true', directory listing is not performed prior to create missing directories." + " By default, this processor executes a directory listing command" + " to see target directory existence before creating missing directories." + - " However, there are situations that you might need to disable the directory listing such as followings." + + " However, there are situations that you might need to disable the directory listing such as the following." + " Directory listing might fail with some permission setups (e.g. chmod 100) on a directory." + " Also, if any other SFTP client created the directory after this processor performed a listing" + " and before a directory creation request by this processor is finished," + @@ -353,48 +353,52 @@ public class SFTPTransfer implements FileTransfer { final String remoteDirectory = directoryName.getAbsolutePath().replace("\\", "/").replaceAll("^.\\:", ""); // if we disable the directory listing, we just want to blindly perform the mkdir command, - // eating any exceptions thrown (like if the directory already exists). + // eating failure exceptions thrown (like if the directory already exists). if (disableDirectoryListing) { try { + // Blindly create the dir. channel.mkdir(remoteDirectory); + // The remote directory did not exist, and was created successfully. + return; } catch (SftpException e) { - if (e.id != ChannelSftp.SSH_FX_FAILURE) { + if (e.id == ChannelSftp.SSH_FX_NO_SUCH_FILE) { + // No Such File. This happens when parent directory was not found. + logger.debug(String.format("Could not create %s due to 'No such file'. Will try to create the parent dir.", remoteDirectory)); + } else if (e.id == ChannelSftp.SSH_FX_FAILURE) { + // Swallow '4: Failure' including the remote directory already exists. + logger.debug("Could not blindly create remote directory due to " + e.getMessage(), e); + return; + } else { throw new IOException("Could not blindly create remote directory due to " + e.getMessage(), e); } } - return; - } - // end if disableDirectoryListing - - boolean exists; - try { - channel.stat(remoteDirectory); - exists = true; - } catch (final SftpException e) { - if (e.id == ChannelSftp.SSH_FX_NO_SUCH_FILE) { - // No Such File - exists = false; - } else { - throw new IOException("Failed to determine if remote directory exists at " + remoteDirectory + " due to " + e, e); - } - } - - if (!exists) { - // first ensure parent directories exist before creating this one - if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) { - ensureDirectoryExists(flowFile, directoryName.getParentFile()); - } - logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory}); + } else { try { - channel.mkdir(remoteDirectory); - logger.debug("Created {}", new Object[] {remoteDirectory}); + // Check dir existence. + channel.stat(remoteDirectory); + // The remote directory already exists. + return; } catch (final SftpException e) { - throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + e, e); + if (e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE) { + throw new IOException("Failed to determine if remote directory exists at " + remoteDirectory + " due to " + e, e); + } } } + + // first ensure parent directories exist before creating this one + if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) { + ensureDirectoryExists(flowFile, directoryName.getParentFile()); + } + logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory}); + try { + channel.mkdir(remoteDirectory); + logger.debug("Created {}", new Object[] {remoteDirectory}); + } catch (final SftpException e) { + throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + e, e); + } } - private ChannelSftp getChannel(final FlowFile flowFile) throws IOException { + protected ChannelSftp getChannel(final FlowFile flowFile) throws IOException { if (sftp != null) { String sessionhost = session.getHost(); String desthost = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); @@ -633,7 +637,7 @@ public class SFTPTransfer implements FileTransfer { } catch (final SftpException e) { switch (e.id) { case ChannelSftp.SSH_FX_NO_SUCH_FILE: - throw new FileNotFoundException(); + throw new FileNotFoundException("No such file or directory"); case ChannelSftp.SSH_FX_PERMISSION_DENIED: throw new PermissionDeniedException("Could not rename remote file " + source + " to " + target + " due to insufficient permissions"); default: diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java index 4965893e36..de76b07fe9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFileTransfer.java @@ -130,6 +130,7 @@ public class TestFetchFileTransfer { runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved"); + runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true"); proc.addContent("hello.txt", "world".getBytes()); final Map attrs = new HashMap<>(); @@ -228,10 +229,35 @@ public class TestFetchFileTransfer { assertTrue(proc.fileContents.containsKey("hello.txt")); } + @Test + public void testCreateDirFails() { + final TestableFetchFileTransfer proc = new TestableFetchFileTransfer(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost"); + runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11"); + runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}"); + runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); + runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/"); + runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true"); + + proc.addContent("hello.txt", "world".getBytes()); + final Map attrs = new HashMap<>(); + attrs.put("filename", "hello.txt"); + runner.enqueue(new byte[0], attrs); + proc.allowCreateDir = false; + + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1); + assertEquals(1, proc.fileContents.size()); + + assertTrue(proc.fileContents.containsKey("hello.txt")); + } + private static class TestableFetchFileTransfer extends FetchFileTransfer { private boolean allowAccess = true; private boolean allowDelete = true; + private boolean allowCreateDir = true; private boolean allowRename = true; private boolean closed = false; private final Map fileContents = new HashMap<>(); @@ -340,7 +366,9 @@ public class TestFetchFileTransfer { @Override public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException { - + if (!allowCreateDir) { + throw new PermissionDeniedException("test permission denied"); + } } }; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java new file mode 100644 index 0000000000..9a17838f4a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestSFTPTransfer.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import com.jcraft.jsch.ChannelSftp; +import com.jcraft.jsch.SftpException; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.jcraft.jsch.ChannelSftp.SSH_FX_FAILURE; +import static com.jcraft.jsch.ChannelSftp.SSH_FX_NO_SUCH_FILE; +import static com.jcraft.jsch.ChannelSftp.SSH_FX_PERMISSION_DENIED; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestSFTPTransfer { + + private static final Logger logger = LoggerFactory.getLogger(TestSFTPTransfer.class); + + private SFTPTransfer createSftpTransfer(ProcessContext processContext, ChannelSftp channel) { + final ComponentLog componentLog = mock(ComponentLog.class); + return new SFTPTransfer(processContext, componentLog) { + @Override + protected ChannelSftp getChannel(FlowFile flowFile) throws IOException { + return channel; + } + }; + } + + @Test + public void testEnsureDirectoryExistsAlreadyExisted() throws IOException, SftpException { + final ProcessContext processContext = mock(ProcessContext.class); + final ChannelSftp channel = mock(ChannelSftp.class); + final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel); + final MockFlowFile flowFile = new MockFlowFile(0); + final File remoteDir = new File("/dir1/dir2/dir3"); + sftpTransfer.ensureDirectoryExists(flowFile, remoteDir); + + // Dir existence check should be done by stat + verify(channel).stat(eq("/dir1/dir2/dir3")); + } + + @Test + public void testEnsureDirectoryExistsFailedToStat() throws IOException, SftpException { + final ProcessContext processContext = mock(ProcessContext.class); + final ChannelSftp channel = mock(ChannelSftp.class); + // stat for the parent was successful, simulating that dir2 exists, but no dir3. + when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_FAILURE, "Failure")); + + final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel); + final MockFlowFile flowFile = new MockFlowFile(0); + final File remoteDir = new File("/dir1/dir2/dir3"); + try { + sftpTransfer.ensureDirectoryExists(flowFile, remoteDir); + fail("Should fail"); + } catch (IOException e) { + assertEquals("Failed to determine if remote directory exists at /dir1/dir2/dir3 due to 4: Failure", e.getMessage()); + } + + // Dir existence check should be done by stat + verify(channel).stat(eq("/dir1/dir2/dir3")); + } + + @Test + public void testEnsureDirectoryExistsNotExisted() throws IOException, SftpException { + final ProcessContext processContext = mock(ProcessContext.class); + final ChannelSftp channel = mock(ChannelSftp.class); + // stat for the parent was successful, simulating that dir2 exists, but no dir3. + when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file")); + + final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel); + final MockFlowFile flowFile = new MockFlowFile(0); + final File remoteDir = new File("/dir1/dir2/dir3"); + sftpTransfer.ensureDirectoryExists(flowFile, remoteDir); + + // Dir existence check should be done by stat + verify(channel).stat(eq("/dir1/dir2/dir3")); // dir3 was not found + verify(channel).stat(eq("/dir1/dir2")); // so, dir2 was checked + verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir2 existed, so dir3 was created. + } + + @Test + public void testEnsureDirectoryExistsParentNotExisted() throws IOException, SftpException { + final ProcessContext processContext = mock(ProcessContext.class); + final ChannelSftp channel = mock(ChannelSftp.class); + // stat for the dir1 was successful, simulating that dir1 exists, but no dir2 and dir3. + when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file")); + when(channel.stat("/dir1/dir2")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file")); + + final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel); + final MockFlowFile flowFile = new MockFlowFile(0); + final File remoteDir = new File("/dir1/dir2/dir3"); + sftpTransfer.ensureDirectoryExists(flowFile, remoteDir); + + // Dir existence check should be done by stat + verify(channel).stat(eq("/dir1/dir2/dir3")); // dir3 was not found + verify(channel).stat(eq("/dir1/dir2")); // dir2 was not found, too + verify(channel).stat(eq("/dir1")); // dir1 was found + verify(channel).mkdir(eq("/dir1/dir2")); // dir1 existed, so dir2 was created. + verify(channel).mkdir(eq("/dir1/dir2/dir3")); // then dir3 was created. + } + + @Test + public void testEnsureDirectoryExistsNotExistedFailedToCreate() throws IOException, SftpException { + final ProcessContext processContext = mock(ProcessContext.class); + final ChannelSftp channel = mock(ChannelSftp.class); + // stat for the parent was successful, simulating that dir2 exists, but no dir3. + when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file")); + // Failed to create dir3. + doThrow(new SftpException(SSH_FX_FAILURE, "Failed")).when(channel).mkdir(eq("/dir1/dir2/dir3")); + + final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel); + final MockFlowFile flowFile = new MockFlowFile(0); + final File remoteDir = new File("/dir1/dir2/dir3"); + try { + sftpTransfer.ensureDirectoryExists(flowFile, remoteDir); + fail("Should fail"); + } catch (IOException e) { + assertEquals("Failed to create remote directory /dir1/dir2/dir3 due to 4: Failed", e.getMessage()); + } + + // Dir existence check should be done by stat + verify(channel).stat(eq("/dir1/dir2/dir3")); // dir3 was not found + verify(channel).stat(eq("/dir1/dir2")); // so, dir2 was checked + verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir2 existed, so dir3 was created. + } + + @Test + public void testEnsureDirectoryExistsBlindlyNotExisted() throws IOException, SftpException { + final ProcessContext processContext = mock(ProcessContext.class); + when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true")); + + final ChannelSftp channel = mock(ChannelSftp.class); + final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel); + final MockFlowFile flowFile = new MockFlowFile(0); + final File remoteDir = new File("/dir1/dir2/dir3"); + sftpTransfer.ensureDirectoryExists(flowFile, remoteDir); + + // stat should not be called. + verify(channel, times(0)).stat(eq("/dir1/dir2/dir3")); + verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly. + } + + @Test + public void testEnsureDirectoryExistsBlindlyParentNotExisted() throws IOException, SftpException { + final ProcessContext processContext = mock(ProcessContext.class); + when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true")); + + final ChannelSftp channel = mock(ChannelSftp.class); + final AtomicInteger mkdirCount = new AtomicInteger(0); + doAnswer(invocation -> { + final int cnt = mkdirCount.getAndIncrement(); + if (cnt == 0) { + // If the parent dir does not exist, no such file exception is thrown. + throw new SftpException(SSH_FX_NO_SUCH_FILE, "Failure"); + } else { + logger.info("Created the dir successfully for the 2nd time"); + } + return true; + }).when(channel).mkdir(eq("/dir1/dir2/dir3")); + + final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel); + final MockFlowFile flowFile = new MockFlowFile(0); + final File remoteDir = new File("/dir1/dir2/dir3"); + sftpTransfer.ensureDirectoryExists(flowFile, remoteDir); + + // stat should not be called. + verify(channel, times(0)).stat(eq("/dir1/dir2/dir3")); + // dir3 was created blindly, but failed for the 1st time, and succeeded for the 2nd time. + verify(channel, times(2)).mkdir(eq("/dir1/dir2/dir3")); + verify(channel).mkdir(eq("/dir1/dir2")); // dir2 was created successfully. + } + + @Test + public void testEnsureDirectoryExistsBlindlyAlreadyExisted() throws IOException, SftpException { + final ProcessContext processContext = mock(ProcessContext.class); + when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true")); + + final ChannelSftp channel = mock(ChannelSftp.class); + // If the dir existed, a failure exception is thrown, but should be swallowed. + doThrow(new SftpException(SSH_FX_FAILURE, "Failure")).when(channel).mkdir(eq("/dir1/dir2/dir3")); + + final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel); + final MockFlowFile flowFile = new MockFlowFile(0); + final File remoteDir = new File("/dir1/dir2/dir3"); + sftpTransfer.ensureDirectoryExists(flowFile, remoteDir); + + // stat should not be called. + verify(channel, times(0)).stat(eq("/dir1/dir2/dir3")); + verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly. + } + + @Test + public void testEnsureDirectoryExistsBlindlyFailed() throws IOException, SftpException { + final ProcessContext processContext = mock(ProcessContext.class); + when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true")); + + final ChannelSftp channel = mock(ChannelSftp.class); + doThrow(new SftpException(SSH_FX_PERMISSION_DENIED, "Permission denied")).when(channel).mkdir(eq("/dir1/dir2/dir3")); + + final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel); + final MockFlowFile flowFile = new MockFlowFile(0); + final File remoteDir = new File("/dir1/dir2/dir3"); + try { + sftpTransfer.ensureDirectoryExists(flowFile, remoteDir); + fail("Should fail"); + } catch (IOException e) { + assertEquals("Could not blindly create remote directory due to Permission denied", e.getMessage()); + } + + // stat should not be called. + verify(channel, times(0)).stat(eq("/dir1/dir2/dir3")); + verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly. + } + +}