mirror of https://github.com/apache/nifi.git
NIFI-5090: Create move target dir dynamically at FetchFTP and FetchSFTP
Support creating target parent directories even if directory listing is disabled. fixed typo in doc Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #2642
This commit is contained in:
parent
159b64b4c8
commit
6e8b1c8f74
|
@ -29,11 +29,6 @@ import org.apache.nifi.annotation.documentation.Tags;
|
|||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processors.standard.util.FileTransfer;
|
||||
import org.apache.nifi.processors.standard.FetchFileTransfer;
|
||||
import org.apache.nifi.processors.standard.GetFTP;
|
||||
import org.apache.nifi.processors.standard.GetSFTP;
|
||||
import org.apache.nifi.processors.standard.PutFTP;
|
||||
import org.apache.nifi.processors.standard.PutSFTP;
|
||||
import org.apache.nifi.processors.standard.util.FTPTransfer;
|
||||
|
||||
// Note that we do not use @SupportsBatching annotation. This processor cannot support batching because it must ensure that session commits happen before remote files are deleted.
|
||||
|
@ -62,6 +57,7 @@ public class FetchFTP extends FetchFileTransfer {
|
|||
properties.add(REMOTE_FILENAME);
|
||||
properties.add(COMPLETION_STRATEGY);
|
||||
properties.add(MOVE_DESTINATION_DIR);
|
||||
properties.add(MOVE_CREATE_DIRECTORY);
|
||||
properties.add(FTPTransfer.CONNECTION_TIMEOUT);
|
||||
properties.add(FTPTransfer.DATA_TIMEOUT);
|
||||
properties.add(FTPTransfer.USE_COMPRESSION);
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.nifi.stream.io.StreamUtils;
|
|||
import org.apache.nifi.util.StopWatch;
|
||||
import org.apache.nifi.util.Tuple;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
@ -101,11 +102,19 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
|
|||
.defaultValue(COMPLETION_NONE.getValue())
|
||||
.required(true)
|
||||
.build();
|
||||
static final PropertyDescriptor MOVE_CREATE_DIRECTORY = new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(FileTransfer.CREATE_DIRECTORY).description(String.format("Used when '%s' is '%s'. %s",
|
||||
COMPLETION_STRATEGY.getDisplayName(),
|
||||
COMPLETION_MOVE.getDisplayName(),
|
||||
FileTransfer.CREATE_DIRECTORY.getDescription()))
|
||||
.required(false)
|
||||
.build();
|
||||
static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder()
|
||||
.name("Move Destination Directory")
|
||||
.description("The directory on the remote server to the move the original file to once it has been ingested into NiFi. "
|
||||
+ "This property is ignored unless the Completion Strategy is set to \"Move File\". The specified directory must already exist on"
|
||||
+ "the remote system, or the rename will fail.")
|
||||
.description(String.format("The directory on the remote server to move the original file to once it has been ingested into NiFi. "
|
||||
+ "This property is ignored unless the %s is set to '%s'. The specified directory must already exist on "
|
||||
+ "the remote system if '%s' is disabled, or the rename will fail.",
|
||||
COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_CREATE_DIRECTORY.getDisplayName()))
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(false)
|
||||
|
@ -189,6 +198,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
|
|||
properties.add(REMOTE_FILENAME);
|
||||
properties.add(COMPLETION_STRATEGY);
|
||||
properties.add(MOVE_DESTINATION_DIR);
|
||||
properties.add(MOVE_CREATE_DIRECTORY);
|
||||
return properties;
|
||||
}
|
||||
|
||||
|
@ -308,15 +318,19 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
|
|||
new Object[]{flowFile, host, port, filename, ioe}, ioe);
|
||||
}
|
||||
} else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
|
||||
String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
|
||||
if (!targetDir.endsWith("/")) {
|
||||
targetDir = targetDir + "/";
|
||||
}
|
||||
final String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String simpleFilename = StringUtils.substringAfterLast(filename, "/");
|
||||
final String target = targetDir + simpleFilename;
|
||||
|
||||
try {
|
||||
transfer.rename(flowFile, filename, target);
|
||||
final String absoluteTargetDirPath = transfer.getAbsolutePath(flowFile, targetDir);
|
||||
final File targetFile = new File(absoluteTargetDirPath, simpleFilename);
|
||||
if (context.getProperty(MOVE_CREATE_DIRECTORY).asBoolean()) {
|
||||
// Create the target directory if necessary.
|
||||
transfer.ensureDirectoryExists(flowFile, targetFile.getParentFile());
|
||||
}
|
||||
|
||||
transfer.rename(flowFile, filename, targetFile.getAbsolutePath());
|
||||
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
|
||||
new Object[]{flowFile, host, port, filename, ioe}, ioe);
|
||||
|
|
|
@ -49,6 +49,14 @@ public class FetchSFTP extends FetchFileTransfer {
|
|||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("22").build();
|
||||
final PropertyDescriptor disableDirectoryListing = new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(SFTPTransfer.DISABLE_DIRECTORY_LISTING)
|
||||
.description(String.format("Control how '%s' is created when '%s' is '%s' and '%s' is enabled. %s",
|
||||
MOVE_DESTINATION_DIR.getDisplayName(),
|
||||
COMPLETION_STRATEGY.getDisplayName(),
|
||||
COMPLETION_MOVE.getDisplayName(),
|
||||
MOVE_CREATE_DIRECTORY.getDisplayName(),
|
||||
SFTPTransfer.DISABLE_DIRECTORY_LISTING.getDescription())).build();
|
||||
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(HOSTNAME);
|
||||
|
@ -60,6 +68,8 @@ public class FetchSFTP extends FetchFileTransfer {
|
|||
properties.add(REMOTE_FILENAME);
|
||||
properties.add(COMPLETION_STRATEGY);
|
||||
properties.add(MOVE_DESTINATION_DIR);
|
||||
properties.add(MOVE_CREATE_DIRECTORY);
|
||||
properties.add(disableDirectoryListing);
|
||||
properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
|
||||
properties.add(SFTPTransfer.DATA_TIMEOUT);
|
||||
properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
|
||||
|
|
|
@ -106,11 +106,7 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
|
|||
if (rootPath == null) {
|
||||
workingDirPath = null;
|
||||
} else {
|
||||
File workingDirectory = new File(rootPath);
|
||||
if (!workingDirectory.getPath().startsWith("/") && !workingDirectory.getPath().startsWith("\\")) {
|
||||
workingDirectory = new File(transfer.getHomeDirectory(flowFile), workingDirectory.getPath());
|
||||
}
|
||||
workingDirPath = workingDirectory.getPath().replace("\\", "/");
|
||||
workingDirPath = transfer.getAbsolutePath(flowFile, rootPath);
|
||||
}
|
||||
|
||||
final boolean rejectZeroByteFiles = context.getProperty(FileTransfer.REJECT_ZERO_BYTE).asBoolean();
|
||||
|
|
|
@ -58,6 +58,22 @@ public interface FileTransfer extends Closeable {
|
|||
|
||||
void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException;
|
||||
|
||||
/**
|
||||
* Compute an absolute file path for the given remote path.
|
||||
* @param flowFile is used to setup file transfer client with its attribute values, to get user home directory
|
||||
* @param remotePath the target remote path
|
||||
* @return The absolute path for the given remote path
|
||||
*/
|
||||
default String getAbsolutePath(FlowFile flowFile, String remotePath) throws IOException {
|
||||
final String absoluteRemotePath;
|
||||
if (!remotePath.startsWith("/") && !remotePath.startsWith("\\")) {
|
||||
absoluteRemotePath = new File(getHomeDirectory(flowFile), remotePath).getPath();
|
||||
} else {
|
||||
absoluteRemotePath = remotePath;
|
||||
}
|
||||
return absoluteRemotePath.replace("\\", "/");
|
||||
}
|
||||
|
||||
public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
|
||||
.name("Hostname")
|
||||
.description("The fully qualified hostname or IP address of the remote system")
|
||||
|
|
|
@ -106,7 +106,7 @@ public class SFTPTransfer implements FileTransfer {
|
|||
.description("If set to 'true', directory listing is not performed prior to create missing directories." +
|
||||
" By default, this processor executes a directory listing command" +
|
||||
" to see target directory existence before creating missing directories." +
|
||||
" However, there are situations that you might need to disable the directory listing such as followings." +
|
||||
" However, there are situations that you might need to disable the directory listing such as the following." +
|
||||
" Directory listing might fail with some permission setups (e.g. chmod 100) on a directory." +
|
||||
" Also, if any other SFTP client created the directory after this processor performed a listing" +
|
||||
" and before a directory creation request by this processor is finished," +
|
||||
|
@ -353,48 +353,52 @@ public class SFTPTransfer implements FileTransfer {
|
|||
final String remoteDirectory = directoryName.getAbsolutePath().replace("\\", "/").replaceAll("^.\\:", "");
|
||||
|
||||
// if we disable the directory listing, we just want to blindly perform the mkdir command,
|
||||
// eating any exceptions thrown (like if the directory already exists).
|
||||
// eating failure exceptions thrown (like if the directory already exists).
|
||||
if (disableDirectoryListing) {
|
||||
try {
|
||||
// Blindly create the dir.
|
||||
channel.mkdir(remoteDirectory);
|
||||
// The remote directory did not exist, and was created successfully.
|
||||
return;
|
||||
} catch (SftpException e) {
|
||||
if (e.id != ChannelSftp.SSH_FX_FAILURE) {
|
||||
if (e.id == ChannelSftp.SSH_FX_NO_SUCH_FILE) {
|
||||
// No Such File. This happens when parent directory was not found.
|
||||
logger.debug(String.format("Could not create %s due to 'No such file'. Will try to create the parent dir.", remoteDirectory));
|
||||
} else if (e.id == ChannelSftp.SSH_FX_FAILURE) {
|
||||
// Swallow '4: Failure' including the remote directory already exists.
|
||||
logger.debug("Could not blindly create remote directory due to " + e.getMessage(), e);
|
||||
return;
|
||||
} else {
|
||||
throw new IOException("Could not blindly create remote directory due to " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
// end if disableDirectoryListing
|
||||
|
||||
boolean exists;
|
||||
try {
|
||||
channel.stat(remoteDirectory);
|
||||
exists = true;
|
||||
} catch (final SftpException e) {
|
||||
if (e.id == ChannelSftp.SSH_FX_NO_SUCH_FILE) {
|
||||
// No Such File
|
||||
exists = false;
|
||||
} else {
|
||||
throw new IOException("Failed to determine if remote directory exists at " + remoteDirectory + " due to " + e, e);
|
||||
}
|
||||
}
|
||||
|
||||
if (!exists) {
|
||||
// first ensure parent directories exist before creating this one
|
||||
if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) {
|
||||
ensureDirectoryExists(flowFile, directoryName.getParentFile());
|
||||
}
|
||||
logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
|
||||
} else {
|
||||
try {
|
||||
channel.mkdir(remoteDirectory);
|
||||
logger.debug("Created {}", new Object[] {remoteDirectory});
|
||||
// Check dir existence.
|
||||
channel.stat(remoteDirectory);
|
||||
// The remote directory already exists.
|
||||
return;
|
||||
} catch (final SftpException e) {
|
||||
throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + e, e);
|
||||
if (e.id != ChannelSftp.SSH_FX_NO_SUCH_FILE) {
|
||||
throw new IOException("Failed to determine if remote directory exists at " + remoteDirectory + " due to " + e, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// first ensure parent directories exist before creating this one
|
||||
if (directoryName.getParent() != null && !directoryName.getParentFile().equals(new File(File.separator))) {
|
||||
ensureDirectoryExists(flowFile, directoryName.getParentFile());
|
||||
}
|
||||
logger.debug("Remote Directory {} does not exist; creating it", new Object[] {remoteDirectory});
|
||||
try {
|
||||
channel.mkdir(remoteDirectory);
|
||||
logger.debug("Created {}", new Object[] {remoteDirectory});
|
||||
} catch (final SftpException e) {
|
||||
throw new IOException("Failed to create remote directory " + remoteDirectory + " due to " + e, e);
|
||||
}
|
||||
}
|
||||
|
||||
private ChannelSftp getChannel(final FlowFile flowFile) throws IOException {
|
||||
protected ChannelSftp getChannel(final FlowFile flowFile) throws IOException {
|
||||
if (sftp != null) {
|
||||
String sessionhost = session.getHost();
|
||||
String desthost = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||
|
@ -633,7 +637,7 @@ public class SFTPTransfer implements FileTransfer {
|
|||
} catch (final SftpException e) {
|
||||
switch (e.id) {
|
||||
case ChannelSftp.SSH_FX_NO_SUCH_FILE:
|
||||
throw new FileNotFoundException();
|
||||
throw new FileNotFoundException("No such file or directory");
|
||||
case ChannelSftp.SSH_FX_PERMISSION_DENIED:
|
||||
throw new PermissionDeniedException("Could not rename remote file " + source + " to " + target + " due to insufficient permissions");
|
||||
default:
|
||||
|
|
|
@ -130,6 +130,7 @@ public class TestFetchFileTransfer {
|
|||
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
|
||||
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
|
||||
runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved");
|
||||
runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true");
|
||||
|
||||
proc.addContent("hello.txt", "world".getBytes());
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
|
@ -228,10 +229,35 @@ public class TestFetchFileTransfer {
|
|||
assertTrue(proc.fileContents.containsKey("hello.txt"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDirFails() {
|
||||
final TestableFetchFileTransfer proc = new TestableFetchFileTransfer();
|
||||
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||
runner.setProperty(FetchFileTransfer.HOSTNAME, "localhost");
|
||||
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, "11");
|
||||
runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "${filename}");
|
||||
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
|
||||
runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
|
||||
runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, "true");
|
||||
|
||||
proc.addContent("hello.txt", "world".getBytes());
|
||||
final Map<String, String> attrs = new HashMap<>();
|
||||
attrs.put("filename", "hello.txt");
|
||||
runner.enqueue(new byte[0], attrs);
|
||||
proc.allowCreateDir = false;
|
||||
|
||||
runner.run(1, false, false);
|
||||
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);
|
||||
assertEquals(1, proc.fileContents.size());
|
||||
|
||||
assertTrue(proc.fileContents.containsKey("hello.txt"));
|
||||
}
|
||||
|
||||
|
||||
private static class TestableFetchFileTransfer extends FetchFileTransfer {
|
||||
private boolean allowAccess = true;
|
||||
private boolean allowDelete = true;
|
||||
private boolean allowCreateDir = true;
|
||||
private boolean allowRename = true;
|
||||
private boolean closed = false;
|
||||
private final Map<String, byte[]> fileContents = new HashMap<>();
|
||||
|
@ -340,7 +366,9 @@ public class TestFetchFileTransfer {
|
|||
|
||||
@Override
|
||||
public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throws IOException {
|
||||
|
||||
if (!allowCreateDir) {
|
||||
throw new PermissionDeniedException("test permission denied");
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -0,0 +1,247 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import com.jcraft.jsch.ChannelSftp;
|
||||
import com.jcraft.jsch.SftpException;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.MockPropertyValue;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static com.jcraft.jsch.ChannelSftp.SSH_FX_FAILURE;
|
||||
import static com.jcraft.jsch.ChannelSftp.SSH_FX_NO_SUCH_FILE;
|
||||
import static com.jcraft.jsch.ChannelSftp.SSH_FX_PERMISSION_DENIED;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestSFTPTransfer {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(TestSFTPTransfer.class);
|
||||
|
||||
private SFTPTransfer createSftpTransfer(ProcessContext processContext, ChannelSftp channel) {
|
||||
final ComponentLog componentLog = mock(ComponentLog.class);
|
||||
return new SFTPTransfer(processContext, componentLog) {
|
||||
@Override
|
||||
protected ChannelSftp getChannel(FlowFile flowFile) throws IOException {
|
||||
return channel;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnsureDirectoryExistsAlreadyExisted() throws IOException, SftpException {
|
||||
final ProcessContext processContext = mock(ProcessContext.class);
|
||||
final ChannelSftp channel = mock(ChannelSftp.class);
|
||||
final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
|
||||
final MockFlowFile flowFile = new MockFlowFile(0);
|
||||
final File remoteDir = new File("/dir1/dir2/dir3");
|
||||
sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
|
||||
|
||||
// Dir existence check should be done by stat
|
||||
verify(channel).stat(eq("/dir1/dir2/dir3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnsureDirectoryExistsFailedToStat() throws IOException, SftpException {
|
||||
final ProcessContext processContext = mock(ProcessContext.class);
|
||||
final ChannelSftp channel = mock(ChannelSftp.class);
|
||||
// stat for the parent was successful, simulating that dir2 exists, but no dir3.
|
||||
when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_FAILURE, "Failure"));
|
||||
|
||||
final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
|
||||
final MockFlowFile flowFile = new MockFlowFile(0);
|
||||
final File remoteDir = new File("/dir1/dir2/dir3");
|
||||
try {
|
||||
sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
|
||||
fail("Should fail");
|
||||
} catch (IOException e) {
|
||||
assertEquals("Failed to determine if remote directory exists at /dir1/dir2/dir3 due to 4: Failure", e.getMessage());
|
||||
}
|
||||
|
||||
// Dir existence check should be done by stat
|
||||
verify(channel).stat(eq("/dir1/dir2/dir3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnsureDirectoryExistsNotExisted() throws IOException, SftpException {
|
||||
final ProcessContext processContext = mock(ProcessContext.class);
|
||||
final ChannelSftp channel = mock(ChannelSftp.class);
|
||||
// stat for the parent was successful, simulating that dir2 exists, but no dir3.
|
||||
when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
|
||||
|
||||
final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
|
||||
final MockFlowFile flowFile = new MockFlowFile(0);
|
||||
final File remoteDir = new File("/dir1/dir2/dir3");
|
||||
sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
|
||||
|
||||
// Dir existence check should be done by stat
|
||||
verify(channel).stat(eq("/dir1/dir2/dir3")); // dir3 was not found
|
||||
verify(channel).stat(eq("/dir1/dir2")); // so, dir2 was checked
|
||||
verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir2 existed, so dir3 was created.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnsureDirectoryExistsParentNotExisted() throws IOException, SftpException {
|
||||
final ProcessContext processContext = mock(ProcessContext.class);
|
||||
final ChannelSftp channel = mock(ChannelSftp.class);
|
||||
// stat for the dir1 was successful, simulating that dir1 exists, but no dir2 and dir3.
|
||||
when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
|
||||
when(channel.stat("/dir1/dir2")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
|
||||
|
||||
final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
|
||||
final MockFlowFile flowFile = new MockFlowFile(0);
|
||||
final File remoteDir = new File("/dir1/dir2/dir3");
|
||||
sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
|
||||
|
||||
// Dir existence check should be done by stat
|
||||
verify(channel).stat(eq("/dir1/dir2/dir3")); // dir3 was not found
|
||||
verify(channel).stat(eq("/dir1/dir2")); // dir2 was not found, too
|
||||
verify(channel).stat(eq("/dir1")); // dir1 was found
|
||||
verify(channel).mkdir(eq("/dir1/dir2")); // dir1 existed, so dir2 was created.
|
||||
verify(channel).mkdir(eq("/dir1/dir2/dir3")); // then dir3 was created.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnsureDirectoryExistsNotExistedFailedToCreate() throws IOException, SftpException {
|
||||
final ProcessContext processContext = mock(ProcessContext.class);
|
||||
final ChannelSftp channel = mock(ChannelSftp.class);
|
||||
// stat for the parent was successful, simulating that dir2 exists, but no dir3.
|
||||
when(channel.stat("/dir1/dir2/dir3")).thenThrow(new SftpException(SSH_FX_NO_SUCH_FILE, "No such file"));
|
||||
// Failed to create dir3.
|
||||
doThrow(new SftpException(SSH_FX_FAILURE, "Failed")).when(channel).mkdir(eq("/dir1/dir2/dir3"));
|
||||
|
||||
final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
|
||||
final MockFlowFile flowFile = new MockFlowFile(0);
|
||||
final File remoteDir = new File("/dir1/dir2/dir3");
|
||||
try {
|
||||
sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
|
||||
fail("Should fail");
|
||||
} catch (IOException e) {
|
||||
assertEquals("Failed to create remote directory /dir1/dir2/dir3 due to 4: Failed", e.getMessage());
|
||||
}
|
||||
|
||||
// Dir existence check should be done by stat
|
||||
verify(channel).stat(eq("/dir1/dir2/dir3")); // dir3 was not found
|
||||
verify(channel).stat(eq("/dir1/dir2")); // so, dir2 was checked
|
||||
verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir2 existed, so dir3 was created.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnsureDirectoryExistsBlindlyNotExisted() throws IOException, SftpException {
|
||||
final ProcessContext processContext = mock(ProcessContext.class);
|
||||
when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
|
||||
|
||||
final ChannelSftp channel = mock(ChannelSftp.class);
|
||||
final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
|
||||
final MockFlowFile flowFile = new MockFlowFile(0);
|
||||
final File remoteDir = new File("/dir1/dir2/dir3");
|
||||
sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
|
||||
|
||||
// stat should not be called.
|
||||
verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
|
||||
verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnsureDirectoryExistsBlindlyParentNotExisted() throws IOException, SftpException {
|
||||
final ProcessContext processContext = mock(ProcessContext.class);
|
||||
when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
|
||||
|
||||
final ChannelSftp channel = mock(ChannelSftp.class);
|
||||
final AtomicInteger mkdirCount = new AtomicInteger(0);
|
||||
doAnswer(invocation -> {
|
||||
final int cnt = mkdirCount.getAndIncrement();
|
||||
if (cnt == 0) {
|
||||
// If the parent dir does not exist, no such file exception is thrown.
|
||||
throw new SftpException(SSH_FX_NO_SUCH_FILE, "Failure");
|
||||
} else {
|
||||
logger.info("Created the dir successfully for the 2nd time");
|
||||
}
|
||||
return true;
|
||||
}).when(channel).mkdir(eq("/dir1/dir2/dir3"));
|
||||
|
||||
final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
|
||||
final MockFlowFile flowFile = new MockFlowFile(0);
|
||||
final File remoteDir = new File("/dir1/dir2/dir3");
|
||||
sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
|
||||
|
||||
// stat should not be called.
|
||||
verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
|
||||
// dir3 was created blindly, but failed for the 1st time, and succeeded for the 2nd time.
|
||||
verify(channel, times(2)).mkdir(eq("/dir1/dir2/dir3"));
|
||||
verify(channel).mkdir(eq("/dir1/dir2")); // dir2 was created successfully.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnsureDirectoryExistsBlindlyAlreadyExisted() throws IOException, SftpException {
|
||||
final ProcessContext processContext = mock(ProcessContext.class);
|
||||
when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
|
||||
|
||||
final ChannelSftp channel = mock(ChannelSftp.class);
|
||||
// If the dir existed, a failure exception is thrown, but should be swallowed.
|
||||
doThrow(new SftpException(SSH_FX_FAILURE, "Failure")).when(channel).mkdir(eq("/dir1/dir2/dir3"));
|
||||
|
||||
final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
|
||||
final MockFlowFile flowFile = new MockFlowFile(0);
|
||||
final File remoteDir = new File("/dir1/dir2/dir3");
|
||||
sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
|
||||
|
||||
// stat should not be called.
|
||||
verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
|
||||
verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnsureDirectoryExistsBlindlyFailed() throws IOException, SftpException {
|
||||
final ProcessContext processContext = mock(ProcessContext.class);
|
||||
when(processContext.getProperty(SFTPTransfer.DISABLE_DIRECTORY_LISTING)).thenReturn(new MockPropertyValue("true"));
|
||||
|
||||
final ChannelSftp channel = mock(ChannelSftp.class);
|
||||
doThrow(new SftpException(SSH_FX_PERMISSION_DENIED, "Permission denied")).when(channel).mkdir(eq("/dir1/dir2/dir3"));
|
||||
|
||||
final SFTPTransfer sftpTransfer = createSftpTransfer(processContext, channel);
|
||||
final MockFlowFile flowFile = new MockFlowFile(0);
|
||||
final File remoteDir = new File("/dir1/dir2/dir3");
|
||||
try {
|
||||
sftpTransfer.ensureDirectoryExists(flowFile, remoteDir);
|
||||
fail("Should fail");
|
||||
} catch (IOException e) {
|
||||
assertEquals("Could not blindly create remote directory due to Permission denied", e.getMessage());
|
||||
}
|
||||
|
||||
// stat should not be called.
|
||||
verify(channel, times(0)).stat(eq("/dir1/dir2/dir3"));
|
||||
verify(channel).mkdir(eq("/dir1/dir2/dir3")); // dir3 was created blindly.
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue