From 9b47961d1c0dcc29f91c083e0affb99e1db07084 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Wed, 28 Dec 2016 14:39:22 +0100 Subject: [PATCH] NIFI-3231 Added EL support to hostname and port in PutTCP/UDP This closes #1361. Signed-off-by: Bryan Bende --- .../util/put/AbstractPutEventProcessor.java | 2 ++ .../apache/nifi/processors/splunk/PutSplunk.java | 8 ++++---- .../org/apache/nifi/processors/standard/PutTCP.java | 8 ++++---- .../org/apache/nifi/processors/standard/PutUDP.java | 11 +++++++---- .../apache/nifi/processors/standard/TestPutUDP.java | 12 ++++++++++++ .../processors/standard/util/TestPutTCPCommon.java | 13 +++++++++++++ 6 files changed, 42 insertions(+), 12 deletions(-) diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java index 9fd0496e13..65b11ffe5a 100644 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java @@ -56,12 +56,14 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue("localhost") .required(true) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor PORT = new PropertyDescriptor .Builder().name("Port") .description("The port on the destination.") .required(true) .addValidator(StandardValidators.PORT_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder() .name("Max Size of Socket Send Buffer") diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java index 0a09243621..56d3e26037 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java @@ -103,16 +103,16 @@ public class PutSplunk extends AbstractPutEventProcessor { @Override protected String createTransitUri(ProcessContext context) { - final String port = context.getProperty(PORT).getValue(); - final String host = context.getProperty(HOSTNAME).getValue(); + final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue(); + final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); final String protocol = context.getProperty(PROTOCOL).getValue().toLowerCase(); return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString(); } @Override protected ChannelSender createSender(ProcessContext context) throws IOException { - final int port = context.getProperty(PORT).asInteger(); - final String host = context.getProperty(HOSTNAME).getValue(); + final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); + final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); final String protocol = context.getProperty(PROTOCOL).getValue(); final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final int maxSendBuffer = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java index 3354b0939c..34f6277609 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java @@ -107,8 +107,8 @@ public class PutTCP extends AbstractPutEventProcessor { @Override protected ChannelSender createSender(final ProcessContext context) throws IOException { final String protocol = TCP_VALUE.getValue(); - final String hostname = context.getProperty(HOSTNAME).getValue(); - final int port = context.getProperty(PORT).asInteger(); + final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); + final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final int bufferSize = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final SSLContextService sslContextService = (SSLContextService) context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(); @@ -133,8 +133,8 @@ public class PutTCP extends AbstractPutEventProcessor { @Override protected String createTransitUri(final ProcessContext context) { final String protocol = TCP_VALUE.getValue(); - final String host = context.getProperty(HOSTNAME).getValue(); - final String port = context.getProperty(PORT).getValue(); + final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); + final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue(); return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString(); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java index 8144786b5b..af23c5441c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java @@ -37,6 +37,7 @@ import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.put.AbstractPutEventProcessor; import org.apache.nifi.processor.util.put.sender.ChannelSender; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StopWatch; /** *

@@ -91,8 +92,8 @@ public class PutUDP extends AbstractPutEventProcessor { @Override protected ChannelSender createSender(final ProcessContext context) throws IOException { final String protocol = UDP_VALUE.getValue(); - final String hostname = context.getProperty(HOSTNAME).getValue(); - final int port = context.getProperty(PORT).asInteger(); + final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); + final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); final int bufferSize = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); return createSender(protocol, hostname, port, 0, bufferSize, null); @@ -109,8 +110,8 @@ public class PutUDP extends AbstractPutEventProcessor { @Override protected String createTransitUri(final ProcessContext context) { final String protocol = UDP_VALUE.getValue(); - final String host = context.getProperty(HOSTNAME).getValue(); - final String port = context.getProperty(PORT).getValue(); + final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); + final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue(); return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString(); } @@ -142,7 +143,9 @@ public class PutUDP extends AbstractPutEventProcessor { try { byte[] content = readContent(session, flowFile); + StopWatch stopWatch = new StopWatch(true); sender.send(content); + session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); session.commit(); } catch (Exception e) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java index 30d89c8a70..a1fe11353d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull; import java.net.DatagramPacket; import java.net.InetAddress; import java.util.concurrent.ArrayBlockingQueue; + import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.After; @@ -33,6 +34,8 @@ import org.junit.Test; public class TestPutUDP { private final static String UDP_SERVER_ADDRESS = "127.0.0.1"; + private final static String SERVER_VARIABLE = "ALKJAFLKJDFLSKJSDFLKJSDF"; + private final static String UDP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}"; private final static String UNKNOWN_HOST = "fgdsfgsdffd"; private final static String INVALID_IP_ADDRESS = "300.300.300.300"; private final static int BUFFER_SIZE = 1024; @@ -60,6 +63,7 @@ public class TestPutUDP { public void setup() throws Exception { createTestServer(UDP_SERVER_ADDRESS, 0, BUFFER_SIZE); runner = TestRunners.newTestRunner(PutUDP.class); + runner.setVariable(SERVER_VARIABLE, UDP_SERVER_ADDRESS); } private void createTestServer(final String address, final int port, final int recvQueueSize) throws Exception { @@ -99,6 +103,14 @@ public class TestPutUDP { checkInputQueueIsEmpty(); } + @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) + public void testValidFilesEL() throws Exception { + configureProperties(UDP_SERVER_ADDRESS_EL, true); + sendTestData(VALID_FILES); + checkReceivedAllData(VALID_FILES); + checkInputQueueIsEmpty(); + } + @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) public void testEmptyFile() throws Exception { configureProperties(UDP_SERVER_ADDRESS, true); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java index ed52c35783..e07d44a1b5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestPutTCPCommon.java @@ -37,6 +37,8 @@ import static org.junit.Assert.assertNull; public abstract class TestPutTCPCommon { private final static String TCP_SERVER_ADDRESS = "127.0.0.1"; + private final static String SERVER_VARIABLE = "ALKJAFLKJDFLSKJSDFLKJSDF"; + private final static String TCP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}"; private final static String UNKNOWN_HOST = "fgdsfgsdffd"; private final static String INVALID_IP_ADDRESS = "300.300.300.300"; private final static int MIN_INVALID_PORT = 0; @@ -72,6 +74,7 @@ public abstract class TestPutTCPCommon { public void setup() throws Exception { recvQueue = new ArrayBlockingQueue>(BUFFER_SIZE); runner = TestRunners.newTestRunner(PutTCP.class); + runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS); } private synchronized TCPTestServer createTestServer(final String address, final ArrayBlockingQueue> recvQueue, final String delimiter) throws Exception { @@ -104,6 +107,16 @@ public abstract class TestPutTCPCommon { checkTotalNumConnections(server, 1); } + @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) + public void testValidFilesEL() throws Exception { + server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER); + configureProperties(TCP_SERVER_ADDRESS_EL, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true); + sendTestData(VALID_FILES); + checkReceivedAllData(recvQueue, VALID_FILES); + checkInputQueueIsEmpty(); + checkTotalNumConnections(server, 1); + } + @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) public void testPruneSenders() throws Exception { server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);