mirror of https://github.com/apache/nifi.git
NIFI-4386 Fixing connection handling in FetchFileTransfer
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #2203.
This commit is contained in:
parent
13e42678b6
commit
883c223ced
|
@ -17,20 +17,6 @@
|
||||||
|
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||||
import org.apache.nifi.components.AllowableValue;
|
import org.apache.nifi.components.AllowableValue;
|
||||||
|
@ -50,6 +36,20 @@ import org.apache.nifi.stream.io.StreamUtils;
|
||||||
import org.apache.nifi.util.StopWatch;
|
import org.apache.nifi.util.StopWatch;
|
||||||
import org.apache.nifi.util.Tuple;
|
import org.apache.nifi.util.Tuple;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A base class for FetchSFTP, FetchFTP processors.
|
* A base class for FetchSFTP, FetchFTP processors.
|
||||||
*
|
*
|
||||||
|
@ -230,97 +230,110 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
|
||||||
transfer = transferWrapper.getFileTransfer();
|
transfer = transferWrapper.getFileTransfer();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pull data from remote system.
|
boolean closeConnection = false;
|
||||||
final InputStream in;
|
|
||||||
try {
|
try {
|
||||||
in = transfer.getInputStream(filename, flowFile);
|
// Pull data from remote system.
|
||||||
|
final InputStream in;
|
||||||
|
try {
|
||||||
|
in = transfer.getInputStream(filename, flowFile);
|
||||||
|
|
||||||
flowFile = session.write(flowFile, new OutputStreamCallback() {
|
flowFile = session.write(flowFile, new OutputStreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void process(final OutputStream out) throws IOException {
|
public void process(final OutputStream out) throws IOException {
|
||||||
StreamUtils.copy(in, out);
|
StreamUtils.copy(in, out);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!transfer.flush(flowFile)) {
|
||||||
|
throw new IOException("completePendingCommand returned false, file transfer failed");
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
if(!transfer.flush(flowFile)) {
|
|
||||||
throw new IOException("completePendingCommand returned false, file transfer failed");
|
|
||||||
}
|
|
||||||
|
|
||||||
transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime()));
|
|
||||||
} catch (final FileNotFoundException e) {
|
|
||||||
getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}",
|
|
||||||
new Object[] {flowFile, filename, host, REL_NOT_FOUND.getName()});
|
|
||||||
session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
|
|
||||||
session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
|
|
||||||
return;
|
|
||||||
} catch (final PermissionDeniedException e) {
|
|
||||||
getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}",
|
|
||||||
new Object[] {flowFile, filename, host, REL_PERMISSION_DENIED.getName()});
|
|
||||||
session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED);
|
|
||||||
session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED);
|
|
||||||
return;
|
|
||||||
} catch (final ProcessException | IOException e) {
|
|
||||||
try {
|
|
||||||
transfer.close();
|
|
||||||
} catch (final IOException e1) {
|
|
||||||
getLogger().warn("Failed to close connection to {}:{} due to {}", new Object[] {host, port, e.toString()}, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure",
|
|
||||||
new Object[] {flowFile, filename, host, port, e.toString()}, e);
|
|
||||||
session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add FlowFile attributes
|
|
||||||
final String protocolName = transfer.getProtocolName();
|
|
||||||
final Map<String, String> attributes = new HashMap<>();
|
|
||||||
attributes.put(protocolName + ".remote.host", host);
|
|
||||||
attributes.put(protocolName + ".remote.port", String.valueOf(port));
|
|
||||||
attributes.put(protocolName + ".remote.filename", filename);
|
|
||||||
|
|
||||||
if (filename.contains("/")) {
|
|
||||||
final String path = StringUtils.substringBeforeLast(filename, "/");
|
|
||||||
final String filenameOnly = StringUtils.substringAfterLast(filename, "/");
|
|
||||||
attributes.put(CoreAttributes.PATH.key(), path);
|
|
||||||
attributes.put(CoreAttributes.FILENAME.key(), filenameOnly);
|
|
||||||
} else {
|
|
||||||
attributes.put(CoreAttributes.FILENAME.key(), filename);
|
|
||||||
}
|
|
||||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
|
||||||
|
|
||||||
// emit provenance event and transfer FlowFile
|
|
||||||
session.getProvenanceReporter().fetch(flowFile, protocolName + "://" + host + ":" + port + "/" + filename, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
|
||||||
|
|
||||||
// it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where
|
|
||||||
// we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would
|
|
||||||
// result in data loss! If we commit the session first, we are safe.
|
|
||||||
session.commit();
|
|
||||||
|
|
||||||
final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue();
|
|
||||||
if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
|
|
||||||
try {
|
|
||||||
transfer.deleteFile(flowFile, null, filename);
|
|
||||||
} catch (final FileNotFoundException e) {
|
} catch (final FileNotFoundException e) {
|
||||||
// file doesn't exist -- effectively the same as removing it. Move on.
|
closeConnection = false;
|
||||||
} catch (final IOException ioe) {
|
getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}",
|
||||||
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}",
|
new Object[]{flowFile, filename, host, REL_NOT_FOUND.getName()});
|
||||||
new Object[] {flowFile, host, port, filename, ioe}, ioe);
|
session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
|
||||||
|
session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
|
||||||
|
return;
|
||||||
|
} catch (final PermissionDeniedException e) {
|
||||||
|
closeConnection = false;
|
||||||
|
getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}",
|
||||||
|
new Object[]{flowFile, filename, host, REL_PERMISSION_DENIED.getName()});
|
||||||
|
session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED);
|
||||||
|
session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED);
|
||||||
|
return;
|
||||||
|
} catch (final ProcessException | IOException e) {
|
||||||
|
closeConnection = true;
|
||||||
|
getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure",
|
||||||
|
new Object[]{flowFile, filename, host, port, e.toString()}, e);
|
||||||
|
session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
} else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
|
|
||||||
String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
|
|
||||||
if (!targetDir.endsWith("/")) {
|
|
||||||
targetDir = targetDir + "/";
|
|
||||||
}
|
|
||||||
final String simpleFilename = StringUtils.substringAfterLast(filename, "/");
|
|
||||||
final String target = targetDir + simpleFilename;
|
|
||||||
|
|
||||||
try {
|
// Add FlowFile attributes
|
||||||
transfer.rename(flowFile, filename, target);
|
final String protocolName = transfer.getProtocolName();
|
||||||
} catch (final IOException ioe) {
|
final Map<String, String> attributes = new HashMap<>();
|
||||||
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
|
attributes.put(protocolName + ".remote.host", host);
|
||||||
new Object[] {flowFile, host, port, filename, ioe}, ioe);
|
attributes.put(protocolName + ".remote.port", String.valueOf(port));
|
||||||
|
attributes.put(protocolName + ".remote.filename", filename);
|
||||||
|
|
||||||
|
if (filename.contains("/")) {
|
||||||
|
final String path = StringUtils.substringBeforeLast(filename, "/");
|
||||||
|
final String filenameOnly = StringUtils.substringAfterLast(filename, "/");
|
||||||
|
attributes.put(CoreAttributes.PATH.key(), path);
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), filenameOnly);
|
||||||
|
} else {
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), filename);
|
||||||
|
}
|
||||||
|
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||||
|
|
||||||
|
// emit provenance event and transfer FlowFile
|
||||||
|
session.getProvenanceReporter().fetch(flowFile, protocolName + "://" + host + ":" + port + "/" + filename, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
||||||
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
|
|
||||||
|
// it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where
|
||||||
|
// we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would
|
||||||
|
// result in data loss! If we commit the session first, we are safe.
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue();
|
||||||
|
if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
|
||||||
|
try {
|
||||||
|
transfer.deleteFile(flowFile, null, filename);
|
||||||
|
} catch (final FileNotFoundException e) {
|
||||||
|
// file doesn't exist -- effectively the same as removing it. Move on.
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}",
|
||||||
|
new Object[]{flowFile, host, port, filename, ioe}, ioe);
|
||||||
|
}
|
||||||
|
} else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
|
||||||
|
String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
|
if (!targetDir.endsWith("/")) {
|
||||||
|
targetDir = targetDir + "/";
|
||||||
|
}
|
||||||
|
final String simpleFilename = StringUtils.substringAfterLast(filename, "/");
|
||||||
|
final String target = targetDir + simpleFilename;
|
||||||
|
|
||||||
|
try {
|
||||||
|
transfer.rename(flowFile, filename, target);
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
|
||||||
|
new Object[]{flowFile, host, port, filename, ioe}, ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (transfer != null) {
|
||||||
|
if (closeConnection) {
|
||||||
|
getLogger().debug("Closing FileTransfer...");
|
||||||
|
try {
|
||||||
|
transfer.close();
|
||||||
|
} catch (final IOException e) {
|
||||||
|
getLogger().warn("Failed to close connection to {}:{} due to {}", new Object[]{host, port, e.getMessage()}, e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
getLogger().debug("Returning FileTransfer to pool...");
|
||||||
|
transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue