mirror of https://github.com/apache/nifi.git
NIFI-3231 Added EL support to hostname and port in PutTCP/UDP
This closes #1361. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
1b4729e448
commit
9b47961d1c
|
@ -56,12 +56,14 @@ public abstract class AbstractPutEventProcessor extends AbstractSessionFactoryPr
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.defaultValue("localhost")
|
.defaultValue("localhost")
|
||||||
.required(true)
|
.required(true)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
.build();
|
.build();
|
||||||
public static final PropertyDescriptor PORT = new PropertyDescriptor
|
public static final PropertyDescriptor PORT = new PropertyDescriptor
|
||||||
.Builder().name("Port")
|
.Builder().name("Port")
|
||||||
.description("The port on the destination.")
|
.description("The port on the destination.")
|
||||||
.required(true)
|
.required(true)
|
||||||
.addValidator(StandardValidators.PORT_VALIDATOR)
|
.addValidator(StandardValidators.PORT_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
.build();
|
.build();
|
||||||
public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
|
||||||
.name("Max Size of Socket Send Buffer")
|
.name("Max Size of Socket Send Buffer")
|
||||||
|
|
|
@ -103,16 +103,16 @@ public class PutSplunk extends AbstractPutEventProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String createTransitUri(ProcessContext context) {
|
protected String createTransitUri(ProcessContext context) {
|
||||||
final String port = context.getProperty(PORT).getValue();
|
final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
|
||||||
final String host = context.getProperty(HOSTNAME).getValue();
|
final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
|
||||||
final String protocol = context.getProperty(PROTOCOL).getValue().toLowerCase();
|
final String protocol = context.getProperty(PROTOCOL).getValue().toLowerCase();
|
||||||
return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
|
return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ChannelSender createSender(ProcessContext context) throws IOException {
|
protected ChannelSender createSender(ProcessContext context) throws IOException {
|
||||||
final int port = context.getProperty(PORT).asInteger();
|
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
|
||||||
final String host = context.getProperty(HOSTNAME).getValue();
|
final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
|
||||||
final String protocol = context.getProperty(PROTOCOL).getValue();
|
final String protocol = context.getProperty(PROTOCOL).getValue();
|
||||||
final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
||||||
final int maxSendBuffer = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
final int maxSendBuffer = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
||||||
|
|
|
@ -107,8 +107,8 @@ public class PutTCP extends AbstractPutEventProcessor {
|
||||||
@Override
|
@Override
|
||||||
protected ChannelSender createSender(final ProcessContext context) throws IOException {
|
protected ChannelSender createSender(final ProcessContext context) throws IOException {
|
||||||
final String protocol = TCP_VALUE.getValue();
|
final String protocol = TCP_VALUE.getValue();
|
||||||
final String hostname = context.getProperty(HOSTNAME).getValue();
|
final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
|
||||||
final int port = context.getProperty(PORT).asInteger();
|
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
|
||||||
final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
final int timeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
||||||
final int bufferSize = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
final int bufferSize = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
||||||
final SSLContextService sslContextService = (SSLContextService) context.getProperty(SSL_CONTEXT_SERVICE).asControllerService();
|
final SSLContextService sslContextService = (SSLContextService) context.getProperty(SSL_CONTEXT_SERVICE).asControllerService();
|
||||||
|
@ -133,8 +133,8 @@ public class PutTCP extends AbstractPutEventProcessor {
|
||||||
@Override
|
@Override
|
||||||
protected String createTransitUri(final ProcessContext context) {
|
protected String createTransitUri(final ProcessContext context) {
|
||||||
final String protocol = TCP_VALUE.getValue();
|
final String protocol = TCP_VALUE.getValue();
|
||||||
final String host = context.getProperty(HOSTNAME).getValue();
|
final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
|
||||||
final String port = context.getProperty(PORT).getValue();
|
final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
|
||||||
|
|
||||||
return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
|
return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.nifi.processor.io.InputStreamCallback;
|
||||||
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
|
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
|
||||||
import org.apache.nifi.processor.util.put.sender.ChannelSender;
|
import org.apache.nifi.processor.util.put.sender.ChannelSender;
|
||||||
import org.apache.nifi.stream.io.StreamUtils;
|
import org.apache.nifi.stream.io.StreamUtils;
|
||||||
|
import org.apache.nifi.util.StopWatch;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -91,8 +92,8 @@ public class PutUDP extends AbstractPutEventProcessor {
|
||||||
@Override
|
@Override
|
||||||
protected ChannelSender createSender(final ProcessContext context) throws IOException {
|
protected ChannelSender createSender(final ProcessContext context) throws IOException {
|
||||||
final String protocol = UDP_VALUE.getValue();
|
final String protocol = UDP_VALUE.getValue();
|
||||||
final String hostname = context.getProperty(HOSTNAME).getValue();
|
final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
|
||||||
final int port = context.getProperty(PORT).asInteger();
|
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
|
||||||
final int bufferSize = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
final int bufferSize = context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
|
||||||
|
|
||||||
return createSender(protocol, hostname, port, 0, bufferSize, null);
|
return createSender(protocol, hostname, port, 0, bufferSize, null);
|
||||||
|
@ -109,8 +110,8 @@ public class PutUDP extends AbstractPutEventProcessor {
|
||||||
@Override
|
@Override
|
||||||
protected String createTransitUri(final ProcessContext context) {
|
protected String createTransitUri(final ProcessContext context) {
|
||||||
final String protocol = UDP_VALUE.getValue();
|
final String protocol = UDP_VALUE.getValue();
|
||||||
final String host = context.getProperty(HOSTNAME).getValue();
|
final String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
|
||||||
final String port = context.getProperty(PORT).getValue();
|
final String port = context.getProperty(PORT).evaluateAttributeExpressions().getValue();
|
||||||
|
|
||||||
return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
|
return new StringBuilder().append(protocol).append("://").append(host).append(":").append(port).toString();
|
||||||
}
|
}
|
||||||
|
@ -142,7 +143,9 @@ public class PutUDP extends AbstractPutEventProcessor {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
byte[] content = readContent(session, flowFile);
|
byte[] content = readContent(session, flowFile);
|
||||||
|
StopWatch stopWatch = new StopWatch(true);
|
||||||
sender.send(content);
|
sender.send(content);
|
||||||
|
session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
session.commit();
|
session.commit();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull;
|
||||||
import java.net.DatagramPacket;
|
import java.net.DatagramPacket;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -33,6 +34,8 @@ import org.junit.Test;
|
||||||
public class TestPutUDP {
|
public class TestPutUDP {
|
||||||
|
|
||||||
private final static String UDP_SERVER_ADDRESS = "127.0.0.1";
|
private final static String UDP_SERVER_ADDRESS = "127.0.0.1";
|
||||||
|
private final static String SERVER_VARIABLE = "ALKJAFLKJDFLSKJSDFLKJSDF";
|
||||||
|
private final static String UDP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}";
|
||||||
private final static String UNKNOWN_HOST = "fgdsfgsdffd";
|
private final static String UNKNOWN_HOST = "fgdsfgsdffd";
|
||||||
private final static String INVALID_IP_ADDRESS = "300.300.300.300";
|
private final static String INVALID_IP_ADDRESS = "300.300.300.300";
|
||||||
private final static int BUFFER_SIZE = 1024;
|
private final static int BUFFER_SIZE = 1024;
|
||||||
|
@ -60,6 +63,7 @@ public class TestPutUDP {
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
createTestServer(UDP_SERVER_ADDRESS, 0, BUFFER_SIZE);
|
createTestServer(UDP_SERVER_ADDRESS, 0, BUFFER_SIZE);
|
||||||
runner = TestRunners.newTestRunner(PutUDP.class);
|
runner = TestRunners.newTestRunner(PutUDP.class);
|
||||||
|
runner.setVariable(SERVER_VARIABLE, UDP_SERVER_ADDRESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createTestServer(final String address, final int port, final int recvQueueSize) throws Exception {
|
private void createTestServer(final String address, final int port, final int recvQueueSize) throws Exception {
|
||||||
|
@ -99,6 +103,14 @@ public class TestPutUDP {
|
||||||
checkInputQueueIsEmpty();
|
checkInputQueueIsEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
|
||||||
|
public void testValidFilesEL() throws Exception {
|
||||||
|
configureProperties(UDP_SERVER_ADDRESS_EL, true);
|
||||||
|
sendTestData(VALID_FILES);
|
||||||
|
checkReceivedAllData(VALID_FILES);
|
||||||
|
checkInputQueueIsEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
|
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
|
||||||
public void testEmptyFile() throws Exception {
|
public void testEmptyFile() throws Exception {
|
||||||
configureProperties(UDP_SERVER_ADDRESS, true);
|
configureProperties(UDP_SERVER_ADDRESS, true);
|
||||||
|
|
|
@ -37,6 +37,8 @@ import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
public abstract class TestPutTCPCommon {
|
public abstract class TestPutTCPCommon {
|
||||||
private final static String TCP_SERVER_ADDRESS = "127.0.0.1";
|
private final static String TCP_SERVER_ADDRESS = "127.0.0.1";
|
||||||
|
private final static String SERVER_VARIABLE = "ALKJAFLKJDFLSKJSDFLKJSDF";
|
||||||
|
private final static String TCP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}";
|
||||||
private final static String UNKNOWN_HOST = "fgdsfgsdffd";
|
private final static String UNKNOWN_HOST = "fgdsfgsdffd";
|
||||||
private final static String INVALID_IP_ADDRESS = "300.300.300.300";
|
private final static String INVALID_IP_ADDRESS = "300.300.300.300";
|
||||||
private final static int MIN_INVALID_PORT = 0;
|
private final static int MIN_INVALID_PORT = 0;
|
||||||
|
@ -72,6 +74,7 @@ public abstract class TestPutTCPCommon {
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
recvQueue = new ArrayBlockingQueue<List<Byte>>(BUFFER_SIZE);
|
recvQueue = new ArrayBlockingQueue<List<Byte>>(BUFFER_SIZE);
|
||||||
runner = TestRunners.newTestRunner(PutTCP.class);
|
runner = TestRunners.newTestRunner(PutTCP.class);
|
||||||
|
runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized TCPTestServer createTestServer(final String address, final ArrayBlockingQueue<List<Byte>> recvQueue, final String delimiter) throws Exception {
|
private synchronized TCPTestServer createTestServer(final String address, final ArrayBlockingQueue<List<Byte>> recvQueue, final String delimiter) throws Exception {
|
||||||
|
@ -104,6 +107,16 @@ public abstract class TestPutTCPCommon {
|
||||||
checkTotalNumConnections(server, 1);
|
checkTotalNumConnections(server, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
|
||||||
|
public void testValidFilesEL() throws Exception {
|
||||||
|
server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
|
||||||
|
configureProperties(TCP_SERVER_ADDRESS_EL, tcp_server_port, OUTGOING_MESSAGE_DELIMITER, false, true);
|
||||||
|
sendTestData(VALID_FILES);
|
||||||
|
checkReceivedAllData(recvQueue, VALID_FILES);
|
||||||
|
checkInputQueueIsEmpty();
|
||||||
|
checkTotalNumConnections(server, 1);
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
|
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
|
||||||
public void testPruneSenders() throws Exception {
|
public void testPruneSenders() throws Exception {
|
||||||
server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
|
server = createTestServer(TCP_SERVER_ADDRESS, recvQueue, OUTGOING_MESSAGE_DELIMITER);
|
||||||
|
|
Loading…
Reference in New Issue