NIFI-4157 Improvements to PutTCP

Signed-off-by: Bryan Bende <bbende@apache.org>
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #1989.
This commit is contained in:
Bryan Bende 2017-07-06 15:40:50 -04:00 committed by Pierre Villard
parent 59754500d5
commit f87d2a2f57
4 changed files with 58 additions and 61 deletions

View File

@ -128,9 +128,8 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr
+ "that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should " + "that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should "
+ "ensure that the FlowFile content does not contain the delimiter character to avoid errors. In order to use a new line character you can " + "ensure that the FlowFile content does not contain the delimiter character to avoid errors. In order to use a new line character you can "
+ "enter '\\n'. For a tab character use '\\t'. Finally for a carriage return use '\\r'.") + "enter '\\n'. For a tab character use '\\t'. Finally for a carriage return use '\\r'.")
.required(true) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("\\n")
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder() public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder()

View File

@ -19,17 +19,20 @@ package org.apache.nifi.processor.util.put.sender;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
/** /**
* Sends messages over an SSLSocketChannel. * Sends messages over an SSLSocketChannel.
*/ */
public class SSLSocketChannelSender extends SocketChannelSender { public class SSLSocketChannelSender extends SocketChannelSender {
private SSLSocketChannel sslChannel;
private SSLContext sslContext; private SSLContext sslContext;
private SSLSocketChannel sslChannel;
private SSLSocketChannelOutputStream sslOutputStream;
public SSLSocketChannelSender(final String host, public SSLSocketChannelSender(final String host,
final int port, final int port,
@ -50,6 +53,7 @@ public class SSLSocketChannelSender extends SocketChannelSender {
// SSLSocketChannel will check if already connected so we can safely call this // SSLSocketChannel will check if already connected so we can safely call this
sslChannel.connect(); sslChannel.connect();
sslOutputStream = new SSLSocketChannelOutputStream(sslChannel);
} }
@Override @Override
@ -65,7 +69,14 @@ public class SSLSocketChannelSender extends SocketChannelSender {
@Override @Override
public void close() { public void close() {
super.close(); super.close();
IOUtils.closeQuietly(sslOutputStream);
IOUtils.closeQuietly(sslChannel); IOUtils.closeQuietly(sslChannel);
sslChannel = null; sslChannel = null;
} }
@Override
public OutputStream getOutputStream() {
return sslOutputStream;
}
} }

View File

@ -21,6 +21,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.io.socket.SocketChannelOutputStream; import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
@ -95,4 +96,9 @@ public class SocketChannelSender extends ChannelSender {
socketChannelOutput = null; socketChannelOutput = null;
channel = null; channel = null;
} }
public OutputStream getOutputStream() {
return socketChannelOutput;
}
} }

View File

@ -16,14 +16,7 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.ByteArrayOutputStream; import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
@ -37,14 +30,21 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
import org.apache.nifi.processor.util.put.sender.ChannelSender; import org.apache.nifi.processor.util.put.sender.ChannelSender;
import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
/** /**
* <p> * <p>
@ -178,15 +178,34 @@ public class PutTCP extends AbstractPutEventProcessor {
return; return;
} }
try { // really shouldn't happen since we know the protocol is TCP here, but this is more graceful so we
String outgoingMessageDelimiter = getOutgoingMessageDelimiter(context, flowFile); // can cast to a SocketChannelSender later in order to obtain the OutputStream
ByteArrayOutputStream content = readContent(session, flowFile); if (!(sender instanceof SocketChannelSender)) {
if (outgoingMessageDelimiter != null) { getLogger().error("Processor can only be used with a SocketChannelSender, but obtained: " + sender.getClass().getCanonicalName());
Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); context.yield();
content = appendDelimiter(content, outgoingMessageDelimiter, charset); return;
} }
StopWatch stopWatch = new StopWatch(true);
sender.send(content.toByteArray()); boolean closeSender = isConnectionPerFlowFile(context);
try {
// We might keep the connection open across invocations of the processor so don't auto-close this
final OutputStream out = ((SocketChannelSender)sender).getOutputStream();
final String delimiter = getOutgoingMessageDelimiter(context, flowFile);
final StopWatch stopWatch = new StopWatch(true);
try (final InputStream rawIn = session.read(flowFile);
final BufferedInputStream in = new BufferedInputStream(rawIn)) {
IOUtils.copy(in, out);
if (delimiter != null) {
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
out.write(delimiter.getBytes(charSet), 0, delimiter.length());
}
out.flush();
} catch (final Exception e) {
closeSender = true;
throw e;
}
session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
session.commit(); session.commit();
@ -194,11 +213,10 @@ public class PutTCP extends AbstractPutEventProcessor {
onFailure(context, session, flowFile); onFailure(context, session, flowFile);
getLogger().error("Exception while handling a process session, transferring {} to failure.", new Object[] { flowFile }, e); getLogger().error("Exception while handling a process session, transferring {} to failure.", new Object[] { flowFile }, e);
} finally { } finally {
// If we are going to use this sender again, then relinquish it back to the pool. if (closeSender) {
if (!isConnectionPerFlowFile(context)) {
relinquishSender(sender);
} else {
sender.close(); sender.close();
} else {
relinquishSender(sender);
} }
} }
} }
@ -220,43 +238,6 @@ public class PutTCP extends AbstractPutEventProcessor {
context.yield(); context.yield();
} }
/**
* Helper method to read the FlowFile content stream into a ByteArrayOutputStream object.
*
* @param session
* - the current process session.
* @param flowFile
* - the FlowFile to read the content from.
*
* @return ByteArrayOutputStream object containing the FlowFile content.
*/
protected ByteArrayOutputStream readContent(final ProcessSession session, final FlowFile flowFile) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) flowFile.getSize() + 1);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.copy(in, baos);
}
});
return baos;
}
/**
* Helper method to append a delimiter to the message contents.
*
* @param content
* - the message contents.
* @param delimiter
* - the delimiter value.
*
* @return ByteArrayOutputStream object containing the new message contents.
*/
protected ByteArrayOutputStream appendDelimiter(final ByteArrayOutputStream content, final String delimiter, Charset charSet) {
content.write(delimiter.getBytes(charSet), 0, delimiter.length());
return content;
}
/** /**
* Gets the current value of the "Connection Per FlowFile" property. * Gets the current value of the "Connection Per FlowFile" property.
* *