NIFI-6181 FetchSFTP and FetchFTP File Not Found fix

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #3407.
This commit is contained in:
bdesert 2019-04-04 01:40:17 -04:00 committed by Pierre Villard
parent 96279415a7
commit 75217b33d0
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
4 changed files with 28 additions and 2 deletions

View File

@ -73,6 +73,7 @@ public class FetchFTP extends FetchFileTransfer {
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
properties.add(FTPTransfer.BUFFER_SIZE);
properties.add(FILE_NOT_FOUND_LOG_LEVEL);
return properties;
}

View File

@ -18,12 +18,14 @@
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -120,6 +122,14 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
.required(false)
.build();
static final PropertyDescriptor FILE_NOT_FOUND_LOG_LEVEL = new PropertyDescriptor.Builder()
.displayName("Log level when file not found")
.name("fetchfiletransfer-notfound-loglevel")
.description("Log level to use in case the file does not exist when the processor is triggered")
.allowableValues(LogLevel.values())
.defaultValue(LogLevel.ERROR.toString()) // backward compatibility support
.required(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -141,6 +151,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
private final Map<Tuple<String, Integer>, BlockingQueue<FileTransferIdleWrapper>> fileTransferMap = new HashMap<>();
private final long IDLE_CONNECTION_MILLIS = TimeUnit.SECONDS.toMillis(10L); // amount of time to wait before closing an idle connection
private volatile long lastClearTime = System.currentTimeMillis();
private LogLevel levelFileNotFound = LogLevel.ERROR;
@Override
public Set<Relationship> getRelationships() {
@ -152,6 +163,12 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
return relationships;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
levelFileNotFound = LogLevel.valueOf(context.getProperty(FILE_NOT_FOUND_LOG_LEVEL).getValue());
}
/**
* Close connections that are idle or optionally close all connections.
* Connections are considered "idle" if they have not been used in 10 seconds.
@ -261,7 +278,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
} catch (final FileNotFoundException e) {
closeConnection = false;
getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}",
getLogger().log(levelFileNotFound, "Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}",
new Object[]{flowFile, filename, host, REL_NOT_FOUND.getName()});
session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);

View File

@ -86,6 +86,7 @@ public class FetchSFTP extends FetchFileTransfer {
properties.add(FTPTransfer.PROXY_PORT);
properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
properties.add(FILE_NOT_FOUND_LOG_LEVEL);
return properties;
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard.util;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
@ -322,7 +323,13 @@ public class FTPTransfer implements FileTransfer {
final FTPClient client = getClient(flowFile);
InputStream in = client.retrieveFileStream(remoteFileName);
if (in == null) {
throw new IOException(client.getReplyString());
final String response = client.getReplyString();
// FTPClient doesn't throw exception if file not found.
// Instead, response string will contain: "550 Can't open <absolute_path>: No such file or directory"
if (response != null && response.trim().endsWith("No such file or directory")){
throw new FileNotFoundException(response);
}
throw new IOException(response);
}
return in;
}