From 9e6ee656b54a91384ce2d2d31cd2d0489534f61b Mon Sep 17 00:00:00 2001 From: Joe Witt Date: Mon, 26 Sep 2022 14:00:23 -0700 Subject: [PATCH] NIFI-10532 ensuring client gets reset if any of the key values host/port/user/pw change on a per ff basis (#6445) --- .../processors/standard/PutFileTransfer.java | 6 ++-- .../processors/standard/util/FTPTransfer.java | 18 +++++++++-- .../standard/util/SFTPTransfer.java | 30 +++++++++++++++++-- 3 files changed, 45 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java index b729679f87..afb6bb6058 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFileTransfer.java @@ -102,10 +102,8 @@ public abstract class PutFileTransfer extends AbstractPr int fileCount = 0; try (final T transfer = getFileTransfer(context)) { do { - //check if hostname is regular expression requiring evaluation - if(context.getProperty(FileTransfer.HOSTNAME).isExpressionLanguagePresent()) { - hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); - } + //evaluate again inside the loop as each flowfile can have a different hostname + hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); final String rootPath = context.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions(flowFile).getValue(); final String workingDirPath; if (StringUtils.isBlank(rootPath)) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java index 2d3e7adb2f..f36c1ed612 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java @@ -32,6 +32,7 @@ import java.util.Date; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -49,6 +50,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.util.FileTransfer; import org.apache.nifi.processors.standard.ftp.FTPClientProvider; import org.apache.nifi.processors.standard.ftp.StandardFTPClientProvider; import org.apache.nifi.proxy.ProxyConfiguration; @@ -152,6 +154,9 @@ public class FTPTransfer implements FileTransfer { private FTPClient client; private String homeDirectory; private String remoteHostName; + private String remotePort; + private String remoteUsername; + private String remotePassword; public FTPTransfer(final ProcessContext context, final ComponentLog logger) { this.ctx = context; @@ -546,10 +551,16 @@ public class FTPTransfer implements FileTransfer { private FTPClient getClient(final FlowFile flowFile) throws IOException { final String hostname = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); + final String port = ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).getValue(); + final String username = ctx.getProperty(FileTransfer.USERNAME).evaluateAttributeExpressions(flowFile).getValue(); + final String password = ctx.getProperty(FileTransfer.PASSWORD).evaluateAttributeExpressions(flowFile).getValue(); if (client != null) { - if (remoteHostName.equals(hostname)) { - // destination matches so we can keep our current session + if (Objects.equals(remoteHostName, hostname) + && Objects.equals(remotePort, port) + && Objects.equals(remoteUsername, username) + && Objects.equals(remotePassword, password)) { + // The key things match so we can keep our current session resetWorkingDirectory(); return client; } else { @@ -561,6 +572,9 @@ public class FTPTransfer implements FileTransfer { final Map attributes = flowFile == null ? Collections.emptyMap() : flowFile.getAttributes(); client = createClient(ctx, attributes); remoteHostName = hostname; + remotePort = port; + remoteUsername = username; + remotePassword = password; closed = false; homeDirectory = client.printWorkingDirectory(); return client; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java index 2e58ee122d..85eb806f61 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SFTPTransfer.java @@ -64,6 +64,7 @@ import java.util.Date; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -226,6 +227,12 @@ public class SFTPTransfer implements FileTransfer { private volatile boolean closed = false; private String homeDir; + private String activeHostname; + private String activePort; + private String activeUsername; + private String activePassword; + private String activePrivateKeyPath; + private String activePrivateKeyPassphrase; private final boolean disableDirectoryListing; @@ -580,12 +587,23 @@ public class SFTPTransfer implements FileTransfer { } protected SFTPClient getSFTPClient(final FlowFile flowFile) throws IOException { + final String evaledHostname = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); + final String evaledPort = ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).getValue(); + final String evaledUsername = ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue(); + final String evaledPassword = ctx.getProperty(PASSWORD).evaluateAttributeExpressions(flowFile).getValue(); + final String evaledPrivateKeyPath = ctx.getProperty(PRIVATE_KEY_PATH).evaluateAttributeExpressions(flowFile).getValue(); + final String evaledPrivateKeyPassphrase = ctx.getProperty(PRIVATE_KEY_PASSPHRASE).evaluateAttributeExpressions(flowFile).getValue(); + // If the client is already initialized then compare the host that the client is connected to with the current // host from the properties/flow-file, and if different then we need to close and reinitialize, if same we can reuse if (sftpClient != null) { - final String clientHost = sshClient.getRemoteHostname(); - final String propertiesHost = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue(); - if (clientHost.equals(propertiesHost)) { + if (Objects.equals(evaledHostname, activeHostname) + && Objects.equals(evaledPort, activePort) + && Objects.equals(evaledUsername, activeUsername) + && Objects.equals(evaledPassword, activePassword) + && Objects.equals(evaledPrivateKeyPath, activePrivateKeyPath) + && Objects.equals(evaledPrivateKeyPassphrase, activePrivateKeyPassphrase) + ) { // destination matches so we can keep our current session return sftpClient; } else { @@ -597,6 +615,12 @@ public class SFTPTransfer implements FileTransfer { final Map attributes = flowFile == null ? Collections.emptyMap() : flowFile.getAttributes(); this.sshClient = SSH_CLIENT_PROVIDER.getClient(ctx, attributes); this.sftpClient = new SFTPClient(new SFTPEngine(sshClient).init()); + activeHostname = evaledHostname; + activePort = evaledPort; + activePassword = evaledPassword; + activeUsername = evaledUsername; + activePrivateKeyPath = evaledPrivateKeyPath; + activePrivateKeyPassphrase = evaledPrivateKeyPassphrase; this.closed = false; // Configure timeout for sftp operations