diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java index 0023c4b2f5..b92ed989e1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java @@ -17,20 +17,6 @@ package org.apache.nifi.processors.standard; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; @@ -50,6 +36,20 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.Tuple; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + /** * A base class for FetchSFTP, FetchFTP processors. * @@ -230,97 +230,110 @@ public abstract class FetchFileTransfer extends AbstractProcessor { transfer = transferWrapper.getFileTransfer(); } - // Pull data from remote system. - final InputStream in; + boolean closeConnection = false; try { - in = transfer.getInputStream(filename, flowFile); + // Pull data from remote system. + final InputStream in; + try { + in = transfer.getInputStream(filename, flowFile); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - StreamUtils.copy(in, out); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + StreamUtils.copy(in, out); + } + }); + + if (!transfer.flush(flowFile)) { + throw new IOException("completePendingCommand returned false, file transfer failed"); } - }); - if(!transfer.flush(flowFile)) { - throw new IOException("completePendingCommand returned false, file transfer failed"); - } - - transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime())); - } catch (final FileNotFoundException e) { - getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}", - new Object[] {flowFile, filename, host, REL_NOT_FOUND.getName()}); - session.transfer(session.penalize(flowFile), REL_NOT_FOUND); - session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND); - return; - } catch (final PermissionDeniedException e) { - getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}", - new Object[] {flowFile, filename, host, REL_PERMISSION_DENIED.getName()}); - session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED); - session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED); - return; - } catch (final ProcessException | IOException e) { - try { - transfer.close(); - } catch (final IOException e1) { - getLogger().warn("Failed to close connection to {}:{} due to {}", new Object[] {host, port, e.toString()}, e); - } - - getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure", - new Object[] {flowFile, filename, host, port, e.toString()}, e); - session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE); - return; - } - - // Add FlowFile attributes - final String protocolName = transfer.getProtocolName(); - final Map attributes = new HashMap<>(); - attributes.put(protocolName + ".remote.host", host); - attributes.put(protocolName + ".remote.port", String.valueOf(port)); - attributes.put(protocolName + ".remote.filename", filename); - - if (filename.contains("/")) { - final String path = StringUtils.substringBeforeLast(filename, "/"); - final String filenameOnly = StringUtils.substringAfterLast(filename, "/"); - attributes.put(CoreAttributes.PATH.key(), path); - attributes.put(CoreAttributes.FILENAME.key(), filenameOnly); - } else { - attributes.put(CoreAttributes.FILENAME.key(), filename); - } - flowFile = session.putAllAttributes(flowFile, attributes); - - // emit provenance event and transfer FlowFile - session.getProvenanceReporter().fetch(flowFile, protocolName + "://" + host + ":" + port + "/" + filename, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(flowFile, REL_SUCCESS); - - // it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where - // we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would - // result in data loss! If we commit the session first, we are safe. - session.commit(); - - final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue(); - if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) { - try { - transfer.deleteFile(flowFile, null, filename); } catch (final FileNotFoundException e) { - // file doesn't exist -- effectively the same as removing it. Move on. - } catch (final IOException ioe) { - getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}", - new Object[] {flowFile, host, port, filename, ioe}, ioe); + closeConnection = false; + getLogger().error("Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}", + new Object[]{flowFile, filename, host, REL_NOT_FOUND.getName()}); + session.transfer(session.penalize(flowFile), REL_NOT_FOUND); + session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND); + return; + } catch (final PermissionDeniedException e) { + closeConnection = false; + getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}", + new Object[]{flowFile, filename, host, REL_PERMISSION_DENIED.getName()}); + session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED); + session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED); + return; + } catch (final ProcessException | IOException e) { + closeConnection = true; + getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure", + new Object[]{flowFile, filename, host, port, e.toString()}, e); + session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE); + return; } - } else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) { - String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue(); - if (!targetDir.endsWith("/")) { - targetDir = targetDir + "/"; - } - final String simpleFilename = StringUtils.substringAfterLast(filename, "/"); - final String target = targetDir + simpleFilename; - try { - transfer.rename(flowFile, filename, target); - } catch (final IOException ioe) { - getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}", - new Object[] {flowFile, host, port, filename, ioe}, ioe); + // Add FlowFile attributes + final String protocolName = transfer.getProtocolName(); + final Map attributes = new HashMap<>(); + attributes.put(protocolName + ".remote.host", host); + attributes.put(protocolName + ".remote.port", String.valueOf(port)); + attributes.put(protocolName + ".remote.filename", filename); + + if (filename.contains("/")) { + final String path = StringUtils.substringBeforeLast(filename, "/"); + final String filenameOnly = StringUtils.substringAfterLast(filename, "/"); + attributes.put(CoreAttributes.PATH.key(), path); + attributes.put(CoreAttributes.FILENAME.key(), filenameOnly); + } else { + attributes.put(CoreAttributes.FILENAME.key(), filename); + } + flowFile = session.putAllAttributes(flowFile, attributes); + + // emit provenance event and transfer FlowFile + session.getProvenanceReporter().fetch(flowFile, protocolName + "://" + host + ":" + port + "/" + filename, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + + // it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where + // we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would + // result in data loss! If we commit the session first, we are safe. + session.commit(); + + final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue(); + if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) { + try { + transfer.deleteFile(flowFile, null, filename); + } catch (final FileNotFoundException e) { + // file doesn't exist -- effectively the same as removing it. Move on. + } catch (final IOException ioe) { + getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}", + new Object[]{flowFile, host, port, filename, ioe}, ioe); + } + } else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) { + String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue(); + if (!targetDir.endsWith("/")) { + targetDir = targetDir + "/"; + } + final String simpleFilename = StringUtils.substringAfterLast(filename, "/"); + final String target = targetDir + simpleFilename; + + try { + transfer.rename(flowFile, filename, target); + } catch (final IOException ioe) { + getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}", + new Object[]{flowFile, host, port, filename, ioe}, ioe); + } + } + } finally { + if (transfer != null) { + if (closeConnection) { + getLogger().debug("Closing FileTransfer..."); + try { + transfer.close(); + } catch (final IOException e) { + getLogger().warn("Failed to close connection to {}:{} due to {}", new Object[]{host, port, e.getMessage()}, e); + } + } else { + getLogger().debug("Returning FileTransfer to pool..."); + transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime())); + } } } }