From b0cc6ae7e84365f57be0518423dfb4fd67843b20 Mon Sep 17 00:00:00 2001 From: Matt Brown Date: Mon, 28 Mar 2016 15:37:56 +0100 Subject: [PATCH] NIFI-1630 PutUDP processor created. Signed-off-by: Bryan Bende --- .../util/put/AbstractPutEventProcessor.java | 85 ++++-- .../nifi/processors/splunk/PutSplunk.java | 29 +- .../nifi/processors/standard/PutUDP.java | 196 +++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../nifi/processors/standard/TestPutUDP.java | 265 ++++++++++++++++++ .../processors/standard/UDPTestServer.java | 86 ++++++ 6 files changed, 623 insertions(+), 39 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/UDPTestServer.java diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java index 961edf5dc0..dfa877d823 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java @@ -71,20 +71,6 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr .defaultValue("1 MB") .required(true) .build(); - public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() - .name("Character Set") - .description("Specifies the character set of the data being sent.") - .required(true) - .defaultValue("UTF-8") - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .build(); - public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() - .name("Timeout") - .description("The timeout for connecting to and communicating with the destination. Does not apply to UDP") - .required(false) - .defaultValue("10 seconds") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor .Builder().name("Idle Connection Expiration") .description("The amount of time a connection should be held open without being used before closing the connection.") @@ -119,6 +105,20 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) .build(); + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies the character set of the data being sent.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .name("Timeout") + .description("The timeout for connecting to and communicating with the destination. Does not apply to UDP") + .required(false) + .defaultValue("10 seconds") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -144,8 +144,6 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr descriptors.add(HOSTNAME); descriptors.add(PORT); descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE); - descriptors.add(CHARSET); - descriptors.add(TIMEOUT); descriptors.add(IDLE_EXPIRATION); descriptors.addAll(getAdditionalProperties()); this.descriptors = Collections.unmodifiableList(descriptors); @@ -288,6 +286,61 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr return sender; } + /** + * Helper method to acquire an available ChannelSender from the pool. If the pool is empty then the a new sender is created. + * + * @param context + * - the current process context. + * + * @param session + * - the current process session. + * @param flowFile + * - the FlowFile being processed in this session. + * + * @return ChannelSender - the sender that has been acquired or null if no sender is available and a new sender cannot be created. + */ + protected ChannelSender acquireSender(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) { + ChannelSender sender = senderPool.poll(); + if (sender == null) { + try { + getLogger().debug("No available connections, creating a new one..."); + sender = createSender(context); + } catch (IOException e) { + getLogger().error("No available connections, and unable to create a new one, transferring {} to failure", + new Object[]{flowFile}, e); + session.transfer(flowFile, REL_FAILURE); + session.commit(); + context.yield(); + sender = null; + } + } + + return sender; + } + + + /** + * Helper method to relinquish the ChannelSender back to the pool. If the sender is disconnected or the pool is full + * then the sender is closed and discarded. + * + * @param sender the sender to return or close + */ + protected void relinquishSender(final ChannelSender sender) { + if (sender != null) { + // if the connection is still open then then try to return the sender to the pool. + if (sender.isConnected()) { + boolean returned = senderPool.offer(sender); + // if the pool is full then close the sender. + if (!returned) { + sender.close(); + } + } else { + // probably already closed here, but quietly close anyway to be safe. + sender.close(); + } + } + } + /** * Represents a range of messages from a FlowFile. */ diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java index 39c684322b..aece3b89f4 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java @@ -73,6 +73,8 @@ public class PutSplunk extends AbstractPutEventProcessor { @Override protected List getAdditionalProperties() { return Arrays.asList( + TIMEOUT, + CHARSET, PROTOCOL, MESSAGE_DELIMITER, SSL_CONTEXT_SERVICE @@ -151,19 +153,10 @@ public class PutSplunk extends AbstractPutEventProcessor { // get a sender from the pool, or create a new one if the pool is empty // if we can't create a new connection then route flow files to failure and yield - ChannelSender sender = senderPool.poll(); + // acquireSender will handle the routing to failure and yielding + ChannelSender sender = acquireSender(context, session, flowFile); if (sender == null) { - try { - getLogger().debug("No available connections, creating a new one..."); - sender = createSender(context); - } catch (IOException e) { - getLogger().error("No available connections, and unable to create a new one, transferring {} to failure", - new Object[]{flowFile}, e); - session.transfer(flowFile, REL_FAILURE); - session.commit(); - context.yield(); - return; - } + return; } try { @@ -180,17 +173,7 @@ public class PutSplunk extends AbstractPutEventProcessor { } } finally { - // if the connection is still open and no IO errors happened then try to return, if pool is full then close - if (sender.isConnected()) { - boolean returned = senderPool.offer(sender); - if (!returned) { - sender.close(); - } - } else { - // probably already closed here, but quietly close anyway to be safe - sender.close(); - } - + relinquishSender(sender); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java new file mode 100644 index 0000000000..8144786b5b --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; +import org.apache.nifi.processor.util.put.sender.ChannelSender; +import org.apache.nifi.stream.io.StreamUtils; + +/** + *

+ * The PutUDP processor receives a FlowFile and packages the FlowFile content into a single UDP datagram packet which is then transmitted to the configured UDP server. The user must ensure that the + * FlowFile content being fed to this processor is not larger than the maximum size for the underlying UDP transport. The maximum transport size will vary based on the platform setup but is generally + * just under 64KB. FlowFiles will be marked as failed if their content is larger than the maximum transport size. + *

+ * + *

+ * This processor has the following required properties: + *

    + *
  • Hostname - The IP address or host name of the destination UDP server.
  • + *
  • Port - The UDP port of the destination UDP server.
  • + *
+ *

+ * + *

+ * This processor has the following optional properties: + *

    + *
  • Max Size of Socket Send Buffer - The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should + * be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped.
  • + *
  • Idle Connection Expiration - The time threshold after which a UDP Datagram sender is deemed eligible for pruning.
  • + *
+ *

+ * + *

+ * The following relationships are required: + *

    + *
  • failure - Where to route FlowFiles that failed to be sent.
  • + *
  • success - Where to route FlowFiles after they were successfully sent to the UDP server.
  • + *
+ *

+ * + */ +@CapabilityDescription("The PutUDP processor receives a FlowFile and packages the FlowFile content into a single UDP datagram packet which is then transmitted to the configured UDP server." + + " The user must ensure that the FlowFile content being fed to this processor is not larger than the maximum size for the underlying UDP transport. The maximum transport size will " + + "vary based on the platform setup but is generally just under 64KB. FlowFiles will be marked as failed if their content is larger than the maximum transport size.") +@InputRequirement(Requirement.INPUT_REQUIRED) +@SeeAlso(ListenUDP.class) +@Tags({ "remote", "egress", "put", "udp" }) +@TriggerWhenEmpty // trigger even when queue is empty so that the processor can check for idle senders to prune. +public class PutUDP extends AbstractPutEventProcessor { + + /** + * Creates a concrete instance of a ChannelSender object to use for sending UDP datagrams. + * + * @param context + * - the current process context. + * + * @return ChannelSender object. + */ + @Override + protected ChannelSender createSender(final ProcessContext context) throws IOException { + final String protocol = UDP_VALUE.getValue(); + final String hostname = context.getProperty(HOSTNAME).getValue(); + final int port = context.getProperty(PORT).asInteger(); + final int bufferSize = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + + return createSender(protocol, hostname, port, 0, bufferSize, null); + } + + /** + * Creates a Universal Resource Identifier (URI) for this processor. Constructs a URI of the form UDP://host:port where the host and port values are taken from the configured property values. + * + * @param context + * - the current process context. + * + * @return The URI value as a String. + */ + @Override + protected String createTransitUri(final ProcessContext context) { + final String protocol = UDP_VALUE.getValue(); + final String host = context.getProperty(HOSTNAME).getValue(); + final String port = context.getProperty(PORT).getValue(); + + return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString(); + } + + /** + * event handler method to handle the FlowFile being forwarded to the Processor by the framework. The FlowFile contents is sent out as a UDP datagram using an acquired ChannelSender object. If the + * FlowFile contents was sent out successfully then the FlowFile is forwarded to the success relationship. If an error occurred then the FlowFile is forwarded to the failure relationship. + * + * @param context + * - the current process context. + * + * @param sessionFactory + * - a factory object to obtain a process session. + */ + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + final ProcessSession session = sessionFactory.createSession(); + final FlowFile flowFile = session.get(); + if (flowFile == null) { + pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + context.yield(); + return; + } + + ChannelSender sender = acquireSender(context, session, flowFile); + if (sender == null) { + return; + } + + try { + byte[] content = readContent(session, flowFile); + sender.send(content); + session.transfer(flowFile, REL_SUCCESS); + session.commit(); + } catch (Exception e) { + getLogger().error("Exception while handling a process session, transferring {} to failure.", new Object[] { flowFile }, e); + onFailure(context, session, flowFile); + } finally { + relinquishSender(sender); + } + } + + /** + * event handler method to perform the required actions when a failure has occurred. The FlowFile is penalized, forwarded to the failure relationship and the context is yielded. + * + * @param context + * - the current process context. + * + * @param session + * - the current process session. + * @param flowFile + * - the FlowFile that has failed to have been processed. + */ + protected void onFailure(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) { + session.transfer(session.penalize(flowFile), REL_FAILURE); + session.commit(); + context.yield(); + } + + + + /** + * Helper method to read the FlowFile content stream into a byte array. + * + * @param session + * - the current process session. + * @param flowFile + * - the FlowFile to read the content from. + * + * @return byte array representation of the FlowFile content. + */ + protected byte[] readContent(final ProcessSession session, final FlowFile flowFile) { + final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) flowFile.getSize() + 1); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.copy(in, baos); + } + }); + + return baos.toByteArray(); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 6c52d289f5..ee6d6d3fb4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -65,6 +65,7 @@ org.apache.nifi.processors.standard.PutJMS org.apache.nifi.processors.standard.PutSFTP org.apache.nifi.processors.standard.PutSQL org.apache.nifi.processors.standard.PutSyslog +org.apache.nifi.processors.standard.PutUDP org.apache.nifi.processors.standard.QueryDatabaseTable org.apache.nifi.processors.standard.ReplaceText org.apache.nifi.processors.standard.RouteText diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java new file mode 100644 index 0000000000..d35b7ea171 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.util.concurrent.ArrayBlockingQueue; + +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestPutUDP { + + private final static String UDP_SERVER_ADDRESS = "127.0.0.1"; + private final static String UNKNOWN_HOST = "fgdsfgsdffd"; + private final static String INVALID_IP_ADDRESS = "300.300.300.300"; + private final static int UDP_SERVER_PORT = 54674; + private final static int UDP_SERVER_PORT_ALT = 54675; + private final static int MIN_INVALID_PORT = 0; + private final static int MIN_VALID_PORT = 1; + private final static int MAX_VALID_PORT = 65535; + private final static int MAX_INVALID_PORT = 65536; + private final static int BUFFER_SIZE = 1024; + private final static int VALID_LARGE_FILE_SIZE = 32768; + private final static int VALID_SMALL_FILE_SIZE = 64; + private final static int INVALID_LARGE_FILE_SIZE = 1000000; + private final static int LOAD_TEST_ITERATIONS = 500; + private final static int LOAD_TEST_THREAD_COUNT = 1; + private final static int DEFAULT_ITERATIONS = 1; + private final static int DEFAULT_THREAD_COUNT = 1; + private final static char CONTENT_CHAR = 'x'; + private final static int DATA_WAIT_PERIOD = 1000; + private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000; + private final static int LONG_TEST_TIMEOUT_PERIOD = 100000; + + private UDPTestServer server; + private TestRunner runner; + private ArrayBlockingQueue recvQueue; + + // Test Data + private final static String[] EMPTY_FILE = { "" }; + private final static String[] VALID_FILES = { "abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" }; + + @Before + public void setup() throws Exception { + createTestServer(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, BUFFER_SIZE); + runner = TestRunners.newTestRunner(PutUDP.class); + } + + private void createTestServer(final String address, final int port, final int recvQueueSize) throws Exception { + recvQueue = new ArrayBlockingQueue(recvQueueSize); + server = new UDPTestServer(InetAddress.getByName(address), port, recvQueue); + server.startServer(); + } + + @After + public void cleanup() throws Exception { + runner.shutdown(); + removeTestServer(); + } + + private void removeTestServer() { + if (server != null) { + server.shutdownServer(); + server = null; + } + } + + private byte[] getPacketData(final DatagramPacket packet) { + final int length = packet.getLength(); + final byte[] packetData = packet.getData(); + final byte[] resizedPacketData = new byte[length]; + for (int i = 0; i < length; i++) { + resizedPacketData[i] = packetData[i]; + } + return resizedPacketData; + } + + @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) + public void testValidFiles() throws Exception { + configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, true); + sendTestData(VALID_FILES); + checkReceivedAllData(VALID_FILES); + checkInputQueueIsEmpty(); + } + + @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) + public void testEmptyFile() throws Exception { + configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, true); + sendTestData(EMPTY_FILE); + checkRelationships(EMPTY_FILE.length, 0); + checkNoDataReceived(); + checkInputQueueIsEmpty(); + } + + @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) + public void testlargeValidFile() throws Exception { + configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, true); + final String[] testData = createContent(VALID_LARGE_FILE_SIZE); + sendTestData(testData); + checkReceivedAllData(testData); + checkInputQueueIsEmpty(); + } + + @Test(timeout = LONG_TEST_TIMEOUT_PERIOD) + public void testlargeInvalidFile() throws Exception { + configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, true); + String[] testData = createContent(INVALID_LARGE_FILE_SIZE); + sendTestData(testData); + checkRelationships(0, testData.length); + checkNoDataReceived(); + checkInputQueueIsEmpty(); + + // Check that the processor recovers and can send the next valid file + testData = createContent(VALID_LARGE_FILE_SIZE); + sendTestData(testData); + checkReceivedAllData(testData); + checkInputQueueIsEmpty(); + } + + @Test(timeout = LONG_TEST_TIMEOUT_PERIOD) + public void testInvalidIPAddress() throws Exception { + configureProperties(INVALID_IP_ADDRESS, UDP_SERVER_PORT, true); + sendTestData(VALID_FILES); + checkNoDataReceived(); + checkRelationships(0, VALID_FILES.length); + checkInputQueueIsEmpty(); + } + + @Test(timeout = LONG_TEST_TIMEOUT_PERIOD) + public void testUnknownHostname() throws Exception { + configureProperties(UNKNOWN_HOST, UDP_SERVER_PORT, true); + sendTestData(VALID_FILES); + checkNoDataReceived(); + checkRelationships(0, VALID_FILES.length); + checkInputQueueIsEmpty(); + } + + @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) + public void testInvalidPort() throws Exception { + configureProperties(UDP_SERVER_ADDRESS, MIN_INVALID_PORT, false); + configureProperties(UDP_SERVER_ADDRESS, MIN_VALID_PORT, true); + configureProperties(UDP_SERVER_ADDRESS, MAX_VALID_PORT, true); + configureProperties(UDP_SERVER_ADDRESS, MAX_INVALID_PORT, false); + } + + @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) + public void testReconfiguration() throws Exception { + configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, true); + sendTestData(VALID_FILES); + checkReceivedAllData(VALID_FILES); + reset(UDP_SERVER_ADDRESS, UDP_SERVER_PORT_ALT, BUFFER_SIZE); + configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT_ALT, true); + sendTestData(VALID_FILES); + checkReceivedAllData(VALID_FILES); + reset(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, BUFFER_SIZE); + configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, true); + sendTestData(VALID_FILES); + checkReceivedAllData(VALID_FILES); + checkInputQueueIsEmpty(); + } + + @Test(timeout = LONG_TEST_TIMEOUT_PERIOD) + public void testLoadTest() throws Exception { + final String[] testData = createContent(VALID_SMALL_FILE_SIZE); + configureProperties(UDP_SERVER_ADDRESS, UDP_SERVER_PORT, true); + sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT); + checkReceivedAllData(testData, LOAD_TEST_ITERATIONS); + checkInputQueueIsEmpty(); + } + + private void reset(final String address, final int port, final int recvQueueSize) throws Exception { + runner.clearTransferState(); + removeTestServer(); + createTestServer(address, port, recvQueueSize); + } + + private void configureProperties(final String host, final int port, final boolean expectValid) { + runner.setProperty(PutUDP.HOSTNAME, host); + runner.setProperty(PutUDP.PORT, Integer.toString(port)); + if (expectValid) { + runner.assertValid(); + } else { + runner.assertNotValid(); + } + } + + private void sendTestData(final String[] testData) { + sendTestData(testData, DEFAULT_ITERATIONS, DEFAULT_THREAD_COUNT); + } + + private void sendTestData(final String[] testData, final int iterations, final int threadCount) { + for (String item : testData) { + runner.setThreadCount(threadCount); + for (int i = 0; i < iterations; i++) { + runner.enqueue(item.getBytes()); + } + runner.run(iterations); + } + } + + private void checkRelationships(final int successCount, final int failedCount) { + runner.assertTransferCount(PutUDP.REL_SUCCESS, successCount); + runner.assertTransferCount(PutUDP.REL_FAILURE, failedCount); + } + + private void checkNoDataReceived() throws Exception { + Thread.sleep(DATA_WAIT_PERIOD); + assertNull(recvQueue.poll()); + } + + private void checkInputQueueIsEmpty() { + runner.assertQueueEmpty(); + } + + private void checkReceivedAllData(final String[] sentData) throws Exception { + checkReceivedAllData(sentData, DEFAULT_ITERATIONS); + } + + private void checkReceivedAllData(final String[] sentData, final int iterations) throws Exception { + // check each sent FlowFile was successfully sent and received. + for (String item : sentData) { + for (int i = 0; i < iterations; i++) { + DatagramPacket packet = recvQueue.take(); + assertNotNull(packet); + assertArrayEquals(item.getBytes(), getPacketData(packet)); + } + } + + runner.assertTransferCount(PutUDP.REL_SUCCESS, sentData.length * iterations); + + // Check that we have no unexpected extra data. + assertNull(recvQueue.poll()); + } + + private String[] createContent(final int size) { + final char[] content = new char[size]; + + for (int i = 0; i < size; i++) { + content[i] = CONTENT_CHAR; + } + + return new String[] { new String(content) }; + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/UDPTestServer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/UDPTestServer.java new file mode 100644 index 0000000000..283568856e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/UDPTestServer.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.SocketException; +import java.util.concurrent.ArrayBlockingQueue; + +public class UDPTestServer implements Runnable { + + private final int MAX_DATAGRAM_PACKET_SIZE = 1000000; + private final InetAddress ipAddress; + private final int port; + private volatile DatagramSocket serverSocket; + private final ArrayBlockingQueue recvQueue; + + public UDPTestServer(final InetAddress ipAddress, final int port, final ArrayBlockingQueue recvQueue) { + this.ipAddress = ipAddress; + this.port = port; + this.recvQueue = recvQueue; + } + + public synchronized void startServer() throws SocketException { + if (!isRunning()) { + serverSocket = new DatagramSocket(port, ipAddress); + Thread t = new Thread(this); + t.setName(this.getClass().getSimpleName()); + t.start(); + } + } + + public synchronized void shutdownServer() { + if (isRunning()) { + serverSocket.close(); + serverSocket = null; + } + } + + private DatagramPacket createDatagramPacket() { + return new DatagramPacket(new byte[MAX_DATAGRAM_PACKET_SIZE], MAX_DATAGRAM_PACKET_SIZE); + } + + private void storeReceivedPacket(final DatagramPacket packet) { + recvQueue.add(packet); + } + + private boolean isRunning() { + return serverSocket != null && !serverSocket.isClosed(); + } + + public DatagramPacket getReceivedPacket() { + return recvQueue.poll(); + } + + @Override + public void run() { + try { + while (isRunning()) { + DatagramPacket packet = createDatagramPacket(); + serverSocket.receive(packet); + storeReceivedPacket(packet); + } + } catch (Exception e) { + // Do Nothing + } finally { + shutdownServer(); + } + + } +}