diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java index 65b11ffe5a..5833819ea8 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java @@ -128,9 +128,8 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr + "that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should " + "ensure that the FlowFile content does not contain the delimiter character to avoid errors. In order to use a new line character you can " + "enter '\\n'. For a tab character use '\\t'. Finally for a carriage return use '\\r'.") - .required(true) + .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("\\n") .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java index a70c9c5e97..70771f1cbd 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java @@ -19,17 +19,20 @@ package org.apache.nifi.processor.util.put.sender; import org.apache.commons.io.IOUtils; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream; import javax.net.ssl.SSLContext; import java.io.IOException; +import java.io.OutputStream; /** * Sends messages over an SSLSocketChannel. */ public class SSLSocketChannelSender extends SocketChannelSender { - private SSLSocketChannel sslChannel; private SSLContext sslContext; + private SSLSocketChannel sslChannel; + private SSLSocketChannelOutputStream sslOutputStream; public SSLSocketChannelSender(final String host, final int port, @@ -50,6 +53,7 @@ public class SSLSocketChannelSender extends SocketChannelSender { // SSLSocketChannel will check if already connected so we can safely call this sslChannel.connect(); + sslOutputStream = new SSLSocketChannelOutputStream(sslChannel); } @Override @@ -65,7 +69,14 @@ public class SSLSocketChannelSender extends SocketChannelSender { @Override public void close() { super.close(); + IOUtils.closeQuietly(sslOutputStream); IOUtils.closeQuietly(sslChannel); sslChannel = null; } + + @Override + public OutputStream getOutputStream() { + return sslOutputStream; + } + } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java index 8d4f8757a8..6f7796b21a 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java @@ -21,6 +21,7 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.remote.io.socket.SocketChannelOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; @@ -95,4 +96,9 @@ public class SocketChannelSender extends ChannelSender { socketChannelOutput = null; channel = null; } + + public OutputStream getOutputStream() { + return socketChannelOutput; + } + } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java index 34f6277609..75165d76d5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java @@ -16,14 +16,7 @@ */ package org.apache.nifi.processors.standard; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeUnit; - +import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; @@ -37,14 +30,21 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.processor.util.put.sender.SocketChannelSender; import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; import javax.net.ssl.SSLContext; +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; /** *
@@ -178,15 +178,34 @@ public class PutTCP extends AbstractPutEventProcessor { return; } + // really shouldn't happen since we know the protocol is TCP here, but this is more graceful so we + // can cast to a SocketChannelSender later in order to obtain the OutputStream + if (!(sender instanceof SocketChannelSender)) { + getLogger().error("Processor can only be used with a SocketChannelSender, but obtained: " + sender.getClass().getCanonicalName()); + context.yield(); + return; + } + + boolean closeSender = isConnectionPerFlowFile(context); try { - String outgoingMessageDelimiter = getOutgoingMessageDelimiter(context, flowFile); - ByteArrayOutputStream content = readContent(session, flowFile); - if (outgoingMessageDelimiter != null) { - Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); - content = appendDelimiter(content, outgoingMessageDelimiter, charset); + // We might keep the connection open across invocations of the processor so don't auto-close this + final OutputStream out = ((SocketChannelSender)sender).getOutputStream(); + final String delimiter = getOutgoingMessageDelimiter(context, flowFile); + + final StopWatch stopWatch = new StopWatch(true); + try (final InputStream rawIn = session.read(flowFile); + final BufferedInputStream in = new BufferedInputStream(rawIn)) { + IOUtils.copy(in, out); + if (delimiter != null) { + final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue()); + out.write(delimiter.getBytes(charSet), 0, delimiter.length()); + } + out.flush(); + } catch (final Exception e) { + closeSender = true; + throw e; } - StopWatch stopWatch = new StopWatch(true); - sender.send(content.toByteArray()); + session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); session.commit(); @@ -194,11 +213,10 @@ public class PutTCP extends AbstractPutEventProcessor { onFailure(context, session, flowFile); getLogger().error("Exception while handling a process session, transferring {} to failure.", new Object[] { flowFile }, e); } finally { - // If we are going to use this sender again, then relinquish it back to the pool. - if (!isConnectionPerFlowFile(context)) { - relinquishSender(sender); - } else { + if (closeSender) { sender.close(); + } else { + relinquishSender(sender); } } } @@ -220,43 +238,6 @@ public class PutTCP extends AbstractPutEventProcessor { context.yield(); } - /** - * Helper method to read the FlowFile content stream into a ByteArrayOutputStream object. - * - * @param session - * - the current process session. - * @param flowFile - * - the FlowFile to read the content from. - * - * @return ByteArrayOutputStream object containing the FlowFile content. - */ - protected ByteArrayOutputStream readContent(final ProcessSession session, final FlowFile flowFile) { - final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) flowFile.getSize() + 1); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.copy(in, baos); - } - }); - - return baos; - } - - /** - * Helper method to append a delimiter to the message contents. - * - * @param content - * - the message contents. - * @param delimiter - * - the delimiter value. - * - * @return ByteArrayOutputStream object containing the new message contents. - */ - protected ByteArrayOutputStream appendDelimiter(final ByteArrayOutputStream content, final String delimiter, Charset charSet) { - content.write(delimiter.getBytes(charSet), 0, delimiter.length()); - return content; - } - /** * Gets the current value of the "Connection Per FlowFile" property. *