mirror of https://github.com/apache/nifi.git
NIFI-10532 ensuring client gets reset if any of the key values host/port/user/pw change on a per ff basis (#6445)
This commit is contained in:
parent
e5059e0514
commit
9e6ee656b5
|
@ -102,10 +102,8 @@ public abstract class PutFileTransfer<T extends FileTransfer> extends AbstractPr
|
||||||
int fileCount = 0;
|
int fileCount = 0;
|
||||||
try (final T transfer = getFileTransfer(context)) {
|
try (final T transfer = getFileTransfer(context)) {
|
||||||
do {
|
do {
|
||||||
//check if hostname is regular expression requiring evaluation
|
//evaluate again inside the loop as each flowfile can have a different hostname
|
||||||
if(context.getProperty(FileTransfer.HOSTNAME).isExpressionLanguagePresent()) {
|
|
||||||
hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
|
hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
}
|
|
||||||
final String rootPath = context.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions(flowFile).getValue();
|
final String rootPath = context.getProperty(FileTransfer.REMOTE_PATH).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
final String workingDirPath;
|
final String workingDirPath;
|
||||||
if (StringUtils.isBlank(rootPath)) {
|
if (StringUtils.isBlank(rootPath)) {
|
||||||
|
|
|
@ -32,6 +32,7 @@ import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@ -49,6 +50,7 @@ import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.processors.standard.util.FileTransfer;
|
||||||
import org.apache.nifi.processors.standard.ftp.FTPClientProvider;
|
import org.apache.nifi.processors.standard.ftp.FTPClientProvider;
|
||||||
import org.apache.nifi.processors.standard.ftp.StandardFTPClientProvider;
|
import org.apache.nifi.processors.standard.ftp.StandardFTPClientProvider;
|
||||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||||
|
@ -152,6 +154,9 @@ public class FTPTransfer implements FileTransfer {
|
||||||
private FTPClient client;
|
private FTPClient client;
|
||||||
private String homeDirectory;
|
private String homeDirectory;
|
||||||
private String remoteHostName;
|
private String remoteHostName;
|
||||||
|
private String remotePort;
|
||||||
|
private String remoteUsername;
|
||||||
|
private String remotePassword;
|
||||||
|
|
||||||
public FTPTransfer(final ProcessContext context, final ComponentLog logger) {
|
public FTPTransfer(final ProcessContext context, final ComponentLog logger) {
|
||||||
this.ctx = context;
|
this.ctx = context;
|
||||||
|
@ -546,10 +551,16 @@ public class FTPTransfer implements FileTransfer {
|
||||||
|
|
||||||
private FTPClient getClient(final FlowFile flowFile) throws IOException {
|
private FTPClient getClient(final FlowFile flowFile) throws IOException {
|
||||||
final String hostname = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
|
final String hostname = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
final String port = ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
final String username = ctx.getProperty(FileTransfer.USERNAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
final String password = ctx.getProperty(FileTransfer.PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
|
||||||
if (client != null) {
|
if (client != null) {
|
||||||
if (remoteHostName.equals(hostname)) {
|
if (Objects.equals(remoteHostName, hostname)
|
||||||
// destination matches so we can keep our current session
|
&& Objects.equals(remotePort, port)
|
||||||
|
&& Objects.equals(remoteUsername, username)
|
||||||
|
&& Objects.equals(remotePassword, password)) {
|
||||||
|
// The key things match so we can keep our current session
|
||||||
resetWorkingDirectory();
|
resetWorkingDirectory();
|
||||||
return client;
|
return client;
|
||||||
} else {
|
} else {
|
||||||
|
@ -561,6 +572,9 @@ public class FTPTransfer implements FileTransfer {
|
||||||
final Map<String, String> attributes = flowFile == null ? Collections.emptyMap() : flowFile.getAttributes();
|
final Map<String, String> attributes = flowFile == null ? Collections.emptyMap() : flowFile.getAttributes();
|
||||||
client = createClient(ctx, attributes);
|
client = createClient(ctx, attributes);
|
||||||
remoteHostName = hostname;
|
remoteHostName = hostname;
|
||||||
|
remotePort = port;
|
||||||
|
remoteUsername = username;
|
||||||
|
remotePassword = password;
|
||||||
closed = false;
|
closed = false;
|
||||||
homeDirectory = client.printWorkingDirectory();
|
homeDirectory = client.printWorkingDirectory();
|
||||||
return client;
|
return client;
|
||||||
|
|
|
@ -64,6 +64,7 @@ import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
@ -226,6 +227,12 @@ public class SFTPTransfer implements FileTransfer {
|
||||||
|
|
||||||
private volatile boolean closed = false;
|
private volatile boolean closed = false;
|
||||||
private String homeDir;
|
private String homeDir;
|
||||||
|
private String activeHostname;
|
||||||
|
private String activePort;
|
||||||
|
private String activeUsername;
|
||||||
|
private String activePassword;
|
||||||
|
private String activePrivateKeyPath;
|
||||||
|
private String activePrivateKeyPassphrase;
|
||||||
|
|
||||||
private final boolean disableDirectoryListing;
|
private final boolean disableDirectoryListing;
|
||||||
|
|
||||||
|
@ -580,12 +587,23 @@ public class SFTPTransfer implements FileTransfer {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected SFTPClient getSFTPClient(final FlowFile flowFile) throws IOException {
|
protected SFTPClient getSFTPClient(final FlowFile flowFile) throws IOException {
|
||||||
|
final String evaledHostname = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
final String evaledPort = ctx.getProperty(PORT).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
final String evaledUsername = ctx.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
final String evaledPassword = ctx.getProperty(PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
final String evaledPrivateKeyPath = ctx.getProperty(PRIVATE_KEY_PATH).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
final String evaledPrivateKeyPassphrase = ctx.getProperty(PRIVATE_KEY_PASSPHRASE).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
|
||||||
// If the client is already initialized then compare the host that the client is connected to with the current
|
// If the client is already initialized then compare the host that the client is connected to with the current
|
||||||
// host from the properties/flow-file, and if different then we need to close and reinitialize, if same we can reuse
|
// host from the properties/flow-file, and if different then we need to close and reinitialize, if same we can reuse
|
||||||
if (sftpClient != null) {
|
if (sftpClient != null) {
|
||||||
final String clientHost = sshClient.getRemoteHostname();
|
if (Objects.equals(evaledHostname, activeHostname)
|
||||||
final String propertiesHost = ctx.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
|
&& Objects.equals(evaledPort, activePort)
|
||||||
if (clientHost.equals(propertiesHost)) {
|
&& Objects.equals(evaledUsername, activeUsername)
|
||||||
|
&& Objects.equals(evaledPassword, activePassword)
|
||||||
|
&& Objects.equals(evaledPrivateKeyPath, activePrivateKeyPath)
|
||||||
|
&& Objects.equals(evaledPrivateKeyPassphrase, activePrivateKeyPassphrase)
|
||||||
|
) {
|
||||||
// destination matches so we can keep our current session
|
// destination matches so we can keep our current session
|
||||||
return sftpClient;
|
return sftpClient;
|
||||||
} else {
|
} else {
|
||||||
|
@ -597,6 +615,12 @@ public class SFTPTransfer implements FileTransfer {
|
||||||
final Map<String, String> attributes = flowFile == null ? Collections.emptyMap() : flowFile.getAttributes();
|
final Map<String, String> attributes = flowFile == null ? Collections.emptyMap() : flowFile.getAttributes();
|
||||||
this.sshClient = SSH_CLIENT_PROVIDER.getClient(ctx, attributes);
|
this.sshClient = SSH_CLIENT_PROVIDER.getClient(ctx, attributes);
|
||||||
this.sftpClient = new SFTPClient(new SFTPEngine(sshClient).init());
|
this.sftpClient = new SFTPClient(new SFTPEngine(sshClient).init());
|
||||||
|
activeHostname = evaledHostname;
|
||||||
|
activePort = evaledPort;
|
||||||
|
activePassword = evaledPassword;
|
||||||
|
activeUsername = evaledUsername;
|
||||||
|
activePrivateKeyPath = evaledPrivateKeyPath;
|
||||||
|
activePrivateKeyPassphrase = evaledPrivateKeyPassphrase;
|
||||||
this.closed = false;
|
this.closed = false;
|
||||||
|
|
||||||
// Configure timeout for sftp operations
|
// Configure timeout for sftp operations
|
||||||
|
|
Loading…
Reference in New Issue