PutTCP Processor created.

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Matt Brown 2016-05-02 11:20:43 +01:00 committed by jpercivall
parent ce8a0de368
commit a7c912ac18
5 changed files with 769 additions and 1 deletions

View File

@ -119,7 +119,24 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr
.defaultValue("10 seconds")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor OUTGOING_MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Outgoing Message Delimiter")
.description("Specifies the delimiter to use when sending messages out over the same TCP stream. The delimiter is appended to each FlowFile message "
+ "that is transmitted over the stream so that the receiver can determine when one message ends and the next message begins. Users should "
+ "ensure that the FlowFile content does not contain the delimiter character to avoid errors. If it is not possible to define a delimiter "
+ "character that is not present in the FlowFile content then the user must use another processor to change the encoding of the data e.g. Base64 "
+ "encoding.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder()
.name("Connection Per FlowFile")
.description("Specifies whether to send each FlowFile's content on an individual connection.")
.required(true)
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are sent successfully to the destination are sent out this relationship.")

View File

@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
import org.apache.nifi.processor.util.put.sender.ChannelSender;
import org.apache.nifi.stream.io.StreamUtils;
/**
* <p>
* The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP
* connection (or pool of TCP connections if multiple input threads are configured). To assist the TCP server with determining message boundaries, an optional "Outgoing Message Delimiter" string can
* be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional "Connection Per FlowFile" parameter can be specified to change the
* behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile is received and closed after the FlowFile has been sent. This option should
* only be used for low message volume scenarios, otherwise the platform may run out of TCP sockets.
* </p>
*
* <p>
* This processor has the following required properties:
* <ul>
* <li><b>Hostname</b> - The IP address or host name of the destination TCP server.</li>
* <li><b>Port</b> - The TCP port of the destination TCP server.</li>
* </ul>
* </p>
*
* <p>
* This processor has the following optional properties:
* <ul>
* <li><b>Connection Per FlowFile</b> - Specifies that each FlowFiles content will be transmitted on a separate TCP connection.</li>
* <li><b>Idle Connection Expiration</b> - The time threshold after which a TCP sender is deemed eligible for pruning - the associated TCP connection will be closed after this timeout.</li>
* <li><b>Max Size of Socket Send Buffer</b> - The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should
* be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped.</li>
* <li><b>Outgoing Message Delimiter/b> - A string to append to the end of each FlowFiles content to indicate the end of the message to the TCP server.</li>
* <li><b>Timeout</b> - The timeout period for determining an error has occurred whilst connecting or sending data.</li>
* </ul>
* </p>
*
* <p>
* The following relationships are required:
* <ul>
* <li><b>failure</b> - Where to route FlowFiles that failed to be sent.</li>
* <li><b>success</b> - Where to route FlowFiles after they were successfully sent to the TCP server.</li>
* </ul>
* </p>
*
*/
@CapabilityDescription("The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. "
+ "By default, the FlowFiles are transmitted over the same TCP connection (or pool of TCP connections if multiple input threads are configured). "
+ "To assist the TCP server with determining message boundaries, an optional \"Outgoing Message Delimiter\" string can be configured which is appended "
+ "to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional \"Connection Per FlowFile\" parameter can be "
+ "specified to change the behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile "
+ "is received and closed after the FlowFile has been sent. This option should only be used for low message volume scenarios, otherwise the platform " + "may run out of TCP sockets.")
@InputRequirement(Requirement.INPUT_REQUIRED)
@SeeAlso(ListenTCP.class)
@Tags({ "remote", "egress", "put", "tcp" })
@TriggerWhenEmpty // trigger even when queue is empty so that the processor can check for idle senders to prune.
public class PutTCP extends AbstractPutEventProcessor {
/**
* Creates a concrete instance of a ChannelSender object to use for sending messages over a TCP stream.
*
* @param context
* - the current process context.
*
* @return ChannelSender object.
*/
@Override
protected ChannelSender createSender(final ProcessContext context) throws IOException {
final String protocol = TCP_VALUE.getValue();
final String hostname = context.getProperty(HOSTNAME).getValue();
final int port = context.getProperty(PORT).asInteger();
final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final int bufferSize = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
return createSender(protocol, hostname, port, timeout, bufferSize, null);
}
/**
* Creates a Universal Resource Identifier (URI) for this processor. Constructs a URI of the form TCP://<host>:<port> where the host and port values are taken from the configured property values.
*
* @param context
* - the current process context.
*
* @return The URI value as a String.
*/
@Override
protected String createTransitUri(final ProcessContext context) {
final String protocol = TCP_VALUE.getValue();
final String host = context.getProperty(HOSTNAME).getValue();
final String port = context.getProperty(PORT).getValue();
return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
}
/**
* Get the additional properties that are used by this processor.
*
* @return List of PropertyDescriptors describing the additional properties.
*/
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(CONNECTION_PER_FLOWFILE, OUTGOING_MESSAGE_DELIMITER, TIMEOUT);
}
/**
* event handler method to handle the FlowFile being forwarded to the Processor by the framework. The FlowFile contents is sent out over a TCP connection using an acquired ChannelSender object. If
* the FlowFile contents was sent out successfully then the FlowFile is forwarded to the success relationship. If an error occurred then the FlowFile is forwarded to the failure relationship.
*
* @param context
* - the current process context.
*
* @param sessionFactory
* - a factory object to obtain a process session.
*/
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
final ProcessSession session = sessionFactory.createSession();
final FlowFile flowFile = session.get();
if (flowFile == null) {
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
context.yield();
return;
}
ChannelSender sender = acquireSender(context, session, flowFile);
if (sender == null) {
return;
}
try {
String delimiter = getDelimiter(context, flowFile);
ByteArrayOutputStream content = readContent(session, flowFile);
if (delimiter != null) {
content = appendDelimiter(content, delimiter);
}
sender.send(content.toByteArray());
session.transfer(flowFile, REL_SUCCESS);
session.commit();
} catch (Exception e) {
getLogger().error("Exception while handling a process session, transferring {} to failure.", new Object[] { flowFile }, e);
onFailure(context, session, flowFile);
} finally {
// If we are going to use this sender again, then relinquish it back to the pool.
if (!isConnectionPerFlowFile(context)) {
relinquishSender(sender);
} else {
sender.close();
}
}
}
/**
* event handler method to perform the required actions when a failure has occurred. The FlowFile is penalized, forwarded to the failure relationship and the context is yielded.
*
* @param context
* - the current process context.
*
* @param session
* - the current process session.
* @param flowFile
* - the FlowFile that has failed to have been processed.
*/
protected void onFailure(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
session.transfer(session.penalize(flowFile), REL_FAILURE);
session.commit();
context.yield();
}
/**
* Helper method to read the FlowFile content stream into a ByteArrayOutputStream object.
*
* @param session
* - the current process session.
* @param flowFile
* - the FlowFile to read the content from.
*
* @return ByteArrayOutputStream object containing the FlowFile content.
*/
protected ByteArrayOutputStream readContent(final ProcessSession session, final FlowFile flowFile) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) flowFile.getSize() + 1);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.copy(in, baos);
}
});
return baos;
}
/**
* Helper method to append a delimiter to the message contents.
*
* @param content
* - the message contents.
* @param delimiter
* - the delimiter value.
*
* @return ByteArrayOutputStream object containing the new message contents.
*/
protected ByteArrayOutputStream appendDelimiter(final ByteArrayOutputStream content, final String delimiter) {
content.write(delimiter.getBytes(), 0, delimiter.length());
return content;
}
/**
* Gets the current value of the "Outgoing Message Delimiter" property.
*
* @param context
* - the current process context.
* @param flowFile
* - the FlowFile being processed.
*
* @return String containing the Delimiter value.
*/
protected String getDelimiter(final ProcessContext context, final FlowFile flowFile) {
String delimiter = context.getProperty(OUTGOING_MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
if (delimiter != null) {
delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
}
return delimiter;
}
/**
* Gets the current value of the "Connection Per FlowFile" property.
*
* @param context
* - the current process context.
*
* @return boolean value - true if a connection per FlowFile is specified.
*/
protected boolean isConnectionPerFlowFile(final ProcessContext context) {
return context.getProperty(CONNECTION_PER_FLOWFILE).getValue().equalsIgnoreCase("true");
}
}

View File

@ -66,6 +66,7 @@ org.apache.nifi.processors.standard.PutJMS
org.apache.nifi.processors.standard.PutSFTP
org.apache.nifi.processors.standard.PutSQL
org.apache.nifi.processors.standard.PutSyslog
org.apache.nifi.processors.standard.PutTCP
org.apache.nifi.processors.standard.PutUDP
org.apache.nifi.processors.standard.QueryDatabaseTable
org.apache.nifi.processors.standard.ReplaceText

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
public class TCPTestServer implements Runnable {
private final InetAddress ipAddress;
private final int port;
private final String messageDelimiter;
private volatile ServerSocket serverSocket;
private final ArrayBlockingQueue<List<Byte>> recvQueue;
private volatile Socket connectionSocket;
public final static String DEFAULT_MESSAGE_DELIMITER = "\n";
private volatile int totalNumConnections = 0;
public TCPTestServer(final InetAddress ipAddress, final int port, final ArrayBlockingQueue<List<Byte>> recvQueue) {
this(ipAddress, port, recvQueue, DEFAULT_MESSAGE_DELIMITER);
}
public TCPTestServer(final InetAddress ipAddress, final int port, final ArrayBlockingQueue<List<Byte>> recvQueue, final String messageDelimiter) {
this.ipAddress = ipAddress;
this.port = port;
this.recvQueue = recvQueue;
this.messageDelimiter = messageDelimiter;
}
public synchronized void startServer() throws IOException {
if (!isServerRunning()) {
serverSocket = new ServerSocket(port, 0, ipAddress);
Thread t = new Thread(this);
t.setName(this.getClass().getSimpleName());
t.start();
}
}
public synchronized void shutdown() {
shutdownConnection();
shutdownServer();
}
public synchronized void shutdownServer() {
if (isServerRunning()) {
try {
serverSocket.close();
} catch (IOException ioe) {
// Do Nothing.
}
}
}
public synchronized void shutdownConnection() {
if (isConnected()) {
try {
connectionSocket.close();
} catch (IOException ioe) {
// Do Nothing.
}
}
}
private void storeReceivedMessage(final List<Byte> message) {
recvQueue.add(message);
}
private boolean isServerRunning() {
return serverSocket != null && !serverSocket.isClosed();
}
private boolean isConnected() {
return connectionSocket != null && !connectionSocket.isClosed();
}
public List<Byte> getReceivedMessage() {
return recvQueue.poll();
}
public int getTotalNumConnections() {
return totalNumConnections;
}
protected boolean isDelimiterPresent(final List<Byte> message) {
if (messageDelimiter != null && message.size() >= messageDelimiter.length()) {
for (int i = 1; i <= messageDelimiter.length(); i++) {
if (message.get(message.size() - i).byteValue() == messageDelimiter.charAt(messageDelimiter.length() - i)) {
if (i == messageDelimiter.length()) {
return true;
}
} else {
break;
}
}
}
return false;
}
protected boolean removeDelimiter(final List<Byte> message) {
if (isDelimiterPresent(message)) {
final int messageSize = message.size();
for (int i = 1; i <= messageDelimiter.length(); i++) {
message.remove(messageSize - i);
}
return true;
}
return false;
}
@Override
public void run() {
try {
while (isServerRunning()) {
connectionSocket = serverSocket.accept();
totalNumConnections++;
InputStream in = connectionSocket.getInputStream();
while (isConnected()) {
final List<Byte> message = new ArrayList<Byte>();
while (true) {
final int c = in.read();
if (c < 0) {
if (!message.isEmpty()) {
storeReceivedMessage(message);
}
shutdownConnection();
break;
}
message.add((byte) c);
if (removeDelimiter(message)) {
storeReceivedMessage(message);
break;
}
}
}
}
} catch (Exception e) {
// Do Nothing
} finally {
shutdownConnection();
shutdownServer();
}
}
}

View File

@ -0,0 +1,313 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestPutTCP {
private final static String TCP_SERVER_ADDRESS = "127.0.0.1";
private final static String UNKNOWN_HOST = "fgdsfgsdffd";
private final static String INVALID_IP_ADDRESS = "300.300.300.300";
private final static int TCP_SERVER_PORT = 54674;
private final static int MIN_INVALID_PORT = 0;
private final static int MIN_VALID_PORT = 1;
private final static int MAX_VALID_PORT = 65535;
private final static int MAX_INVALID_PORT = 65536;
private final static int BUFFER_SIZE = 1024;
private final static int VALID_LARGE_FILE_SIZE = 32768;
private final static int VALID_SMALL_FILE_SIZE = 64;
private final static int LOAD_TEST_ITERATIONS = 500;
private final static int LOAD_TEST_THREAD_COUNT = 1;
private final static int DEFAULT_ITERATIONS = 1;
private final static int DEFAULT_THREAD_COUNT = 1;
private final static char CONTENT_CHAR = 'x';
private final static int DATA_WAIT_PERIOD = 1000;
private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
private final static int LONG_TEST_TIMEOUT_PERIOD = 100000;
private final static String OUTGOING_MESSAGE_DELIMITER = "\n";
private final static String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
private TCPTestServer server;
private TestRunner runner;
private ArrayBlockingQueue<List<Byte>> recvQueue;
// Test Data
private final static String[] EMPTY_FILE = { "" };
private final static String[] VALID_FILES = { "abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" };
@Before
public void setup() throws Exception {
recvQueue = new ArrayBlockingQueue<List<Byte>>(BUFFER_SIZE);
runner = TestRunners.newTestRunner(PutTCP.class);
}
private TCPTestServer createTestServer(final String address, final int port, final ArrayBlockingQueue<List<Byte>> recvQueue, final String delimiter) throws Exception {
TCPTestServer server = new TCPTestServer(InetAddress.getByName(address), port, recvQueue, delimiter);
server.startServer();
return server;
}
@After
public void cleanup() throws Exception {
runner.shutdown();
removeTestServer(server);
}
private void removeTestServer(TCPTestServer server) {
if (server != null) {
server.shutdown();
server = null;
}
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testValidFiles() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testPruneSenders() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
Thread.sleep(10);
checkRelationships(VALID_FILES.length, 0);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
runner.setProperty(PutTCP.IDLE_EXPIRATION, "1 second");
Thread.sleep(2000);
runner.run(1, false, false);
runner.clearTransferState();
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 2);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testMultiCharDelimiter() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, recvQueue, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
configureProperties(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testConnectionPerFlowFile() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, recvQueue, null);
configureProperties(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, null, true, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, VALID_FILES.length);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testConnectionFailure() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
removeTestServer(server);
runner.clearTransferState();
sendTestData(VALID_FILES);
Thread.sleep(10);
// checkRelationships(0, 5);
checkNoDataReceived(recvQueue);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
server = createTestServer(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testEmptyFile() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, null, false, true);
sendTestData(EMPTY_FILE);
Thread.sleep(10);
checkRelationships(EMPTY_FILE.length, 0);
checkNoDataReceived(recvQueue);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testlargeValidFile() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, recvQueue, null);
configureProperties(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, null, true, true);
final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
sendTestData(testData);
checkReceivedAllData(recvQueue, testData);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, testData.length);
}
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testInvalidIPAddress() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(INVALID_IP_ADDRESS, TCP_SERVER_PORT, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
Thread.sleep(10);
checkRelationships(0, VALID_FILES.length);
checkNoDataReceived(recvQueue);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 0);
}
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testUnknownHostname() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(UNKNOWN_HOST, TCP_SERVER_PORT, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
Thread.sleep(10);
checkRelationships(0, VALID_FILES.length);
checkNoDataReceived(recvQueue);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 0);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testInvalidPort() throws Exception {
configureProperties(UNKNOWN_HOST, MIN_INVALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, false);
configureProperties(UNKNOWN_HOST, MIN_VALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, true);
configureProperties(UNKNOWN_HOST, MAX_VALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, true);
configureProperties(UNKNOWN_HOST, MAX_INVALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, false);
}
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testLoadTest() throws Exception {
server = createTestServer(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, recvQueue, OUTGOING_MESSAGE_DELIMITER);
Thread.sleep(1000);
final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
configureProperties(TCP_SERVER_ADDRESS, TCP_SERVER_PORT, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
checkReceivedAllData(recvQueue, testData, LOAD_TEST_ITERATIONS);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
}
private void checkTotalNumConnections(final TCPTestServer server, final int expectedTotalNumConnections) {
assertEquals(expectedTotalNumConnections, server.getTotalNumConnections());
}
private void configureProperties(final String host, final int port, final String outgoingMessageDelimiter, final boolean connectionPerFlowFile, final boolean expectValid) {
runner.setProperty(PutTCP.HOSTNAME, host);
runner.setProperty(PutTCP.PORT, Integer.toString(port));
if (outgoingMessageDelimiter != null) {
runner.setProperty(PutTCP.OUTGOING_MESSAGE_DELIMITER, outgoingMessageDelimiter);
}
if (connectionPerFlowFile) {
runner.setProperty(PutTCP.CONNECTION_PER_FLOWFILE, "true");
} else {
runner.setProperty(PutTCP.CONNECTION_PER_FLOWFILE, "false");
}
if (expectValid) {
runner.assertValid();
} else {
runner.assertNotValid();
}
}
private void sendTestData(final String[] testData) {
sendTestData(testData, DEFAULT_ITERATIONS, DEFAULT_THREAD_COUNT);
}
private void sendTestData(final String[] testData, final int iterations, final int threadCount) {
runner.setThreadCount(threadCount);
for (int i = 0; i < iterations; i++) {
for (String item : testData) {
runner.enqueue(item.getBytes());
}
runner.run(testData.length, false, i == 0 ? true : false);
}
}
private void checkRelationships(final int successCount, final int failedCount) {
runner.assertTransferCount(PutTCP.REL_SUCCESS, successCount);
runner.assertTransferCount(PutTCP.REL_FAILURE, failedCount);
}
private void checkNoDataReceived(final ArrayBlockingQueue<List<Byte>> recvQueue) throws Exception {
Thread.sleep(DATA_WAIT_PERIOD);
assertNull(recvQueue.poll());
}
private void checkInputQueueIsEmpty() {
runner.assertQueueEmpty();
}
private void checkReceivedAllData(final ArrayBlockingQueue<List<Byte>> recvQueue, final String[] sentData) throws Exception {
checkReceivedAllData(recvQueue, sentData, DEFAULT_ITERATIONS);
}
private void checkReceivedAllData(final ArrayBlockingQueue<List<Byte>> recvQueue, final String[] sentData, final int iterations) throws Exception {
// check each sent FlowFile was successfully sent and received.
for (int i = 0; i < iterations; i++) {
for (String item : sentData) {
List<Byte> message = recvQueue.take();
assertNotNull(message);
Byte[] messageBytes = new Byte[message.size()];
assertArrayEquals(item.getBytes(), ArrayUtils.toPrimitive(message.toArray(messageBytes)));
}
}
runner.assertTransferCount(PutTCP.REL_SUCCESS, sentData.length * iterations);
runner.clearTransferState();
// Check that we have no unexpected extra data.
assertNull(recvQueue.poll());
}
private String[] createContent(final int size) {
final char[] content = new char[size];
for (int i = 0; i < size; i++) {
content[i] = CONTENT_CHAR;
}
return new String[] { new String(content) };
}
}