From 494356052139080442bdab0c45e91f41439e4b57 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Tue, 17 Aug 2021 16:18:54 -0500 Subject: [PATCH] NIFI-9223 Corrected ListenSyslog with default address of 0.0.0.0 - Refactored NettyEventServerFactory to accept nullable InetAddress - Updated unit tests referencing NettyEventServerFactory Signed-off-by: Pierre Villard This closes #5426. --- ...teArrayMessageNettyEventServerFactory.java | 7 +- .../netty/NettyEventServerFactory.java | 5 +- .../StringNettyEventSenderFactoryTest.java | 16 ++++- .../nifi/processors/splunk/TestPutSplunk.java | 31 ++++---- .../processors/standard/ListenSyslog.java | 21 +++--- .../processors/standard/TestPutSyslog.java | 11 ++- .../nifi/processors/standard/TestPutTCP.java | 41 ++++++----- .../nifi/processors/standard/TestPutUDP.java | 70 ++++++------------- 8 files changed, 101 insertions(+), 101 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java index b6cd5bffae..036161e406 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java @@ -27,6 +27,7 @@ import org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler; import org.apache.nifi.event.transport.netty.codec.SocketByteArrayMessageDecoder; import org.apache.nifi.logging.ComponentLog; +import java.net.InetAddress; import java.util.Arrays; import java.util.concurrent.BlockingQueue; @@ -40,15 +41,15 @@ public class ByteArrayMessageNettyEventServerFactory extends NettyEventServerFac * Netty Event Server Factory with configurable delimiter and queue of Byte Array Messages * * @param log Component Log - * @param address Remote Address - * @param port Remote Port Number + * @param address Listen Address + * @param port Listen Port Number * @param protocol Channel Protocol * @param delimiter Message Delimiter * @param maxFrameLength Maximum Frame Length for delimited TCP messages * @param messages Blocking Queue for events received */ public ByteArrayMessageNettyEventServerFactory(final ComponentLog log, - final String address, + final InetAddress address, final int port, final TransportProtocol protocol, final byte[] delimiter, diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java index a85ca3febe..4e9901000f 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java @@ -37,6 +37,7 @@ import org.apache.nifi.event.transport.netty.channel.ssl.ServerSslHandlerChannel import org.apache.nifi.security.util.ClientAuth; import javax.net.ssl.SSLContext; +import java.net.InetAddress; import java.time.Duration; import java.util.Collections; import java.util.List; @@ -47,7 +48,7 @@ import java.util.function.Supplier; * Netty Event Server Factory */ public class NettyEventServerFactory extends EventLoopGroupFactory implements EventServerFactory { - private final String address; + private final InetAddress address; private final int port; @@ -65,7 +66,7 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev private Duration shutdownTimeout = ShutdownTimeout.DEFAULT.getDuration(); - public NettyEventServerFactory(final String address, final int port, final TransportProtocol protocol) { + public NettyEventServerFactory(final InetAddress address, final int port, final TransportProtocol protocol) { this.address = address; this.port = port; this.protocol = protocol; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactoryTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactoryTest.java index abf5456783..73615cef75 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactoryTest.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/StringNettyEventSenderFactoryTest.java @@ -36,6 +36,8 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import javax.net.ssl.SSLContext; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.security.GeneralSecurityException; @@ -50,7 +52,7 @@ import static org.junit.Assert.assertThrows; @RunWith(MockitoJUnitRunner.class) public class StringNettyEventSenderFactoryTest { - private static final String ADDRESS = "127.0.0.1"; + private static final InetAddress ADDRESS; private static final int MAX_FRAME_LENGTH = 1024; @@ -66,6 +68,14 @@ public class StringNettyEventSenderFactoryTest { private static final int SINGLE_THREAD = 1; + static { + try { + ADDRESS = InetAddress.getByName("127.0.0.1"); + } catch (final UnknownHostException e) { + throw new IllegalArgumentException(e); + } + } + @Mock private ComponentLog log; @@ -130,12 +140,12 @@ public class StringNettyEventSenderFactoryTest { assertNotNull("Message not received", messageReceived); final String eventReceived = new String(messageReceived.getMessage(), CHARSET); assertEquals("Message not matched", MESSAGE, eventReceived); - assertEquals("Sender not matched", ADDRESS, messageReceived.getSender()); + assertEquals("Sender not matched", ADDRESS.getHostAddress(), messageReceived.getSender()); } private NettyEventSenderFactory getEventSenderFactory(final int port) { final StringNettyEventSenderFactory senderFactory = new StringNettyEventSenderFactory(log, - ADDRESS, port, TransportProtocol.TCP, CHARSET, LineEnding.UNIX); + ADDRESS.getHostAddress(), port, TransportProtocol.TCP, CHARSET, LineEnding.UNIX); senderFactory.setTimeout(DEFAULT_TIMEOUT); senderFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration()); senderFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration()); diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java index 20af50c8b9..f7ed9066b7 100644 --- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java +++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java @@ -31,7 +31,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import javax.net.ssl.SSLContext; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.HashMap; @@ -56,7 +57,7 @@ public class TestPutSplunk { private static final String LOCALHOST = "localhost"; @Before - public void setup() throws Exception { + public void setup() { runner = TestRunners.newTestRunner(PutSplunk.class); } @@ -251,7 +252,7 @@ public class TestPutSplunk { } @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) - public void testUnableToCreateConnectionShouldRouteToFailure() throws InterruptedException { + public void testUnableToCreateConnectionShouldRouteToFailure() { // Set an unreachable port runner.setProperty(PutSplunk.PORT, String.valueOf(NetworkUtils.getAvailableUdpPort())); @@ -263,26 +264,20 @@ public class TestPutSplunk { } private void createTestServer(final TransportProtocol protocol) { - createTestServer(LOCALHOST, protocol, null); - } - - private void createTestServer(final String address, final TransportProtocol protocol, final SSLContext sslContext) { if (protocol == TransportProtocol.UDP) { - createTestServer(address, NetworkUtils.getAvailableUdpPort(), protocol, sslContext); + createTestServer(NetworkUtils.getAvailableUdpPort(), protocol); } else { - createTestServer(address, NetworkUtils.getAvailableTcpPort(), protocol, sslContext); + createTestServer(NetworkUtils.getAvailableTcpPort(), protocol); } } - private void createTestServer(final String address, final int port, final TransportProtocol protocol, final SSLContext sslContext) { + private void createTestServer(final int port, final TransportProtocol protocol) { messages = new LinkedBlockingQueue<>(); runner.setProperty(PutSplunk.PROTOCOL, protocol.name()); runner.setProperty(PutSplunk.PORT, String.valueOf(port)); final byte[] delimiter = OUTGOING_MESSAGE_DELIMITER.getBytes(CHARSET); - NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port, protocol, delimiter, VALID_LARGE_FILE_SIZE, messages); - if (sslContext != null) { - serverFactory.setSslContext(sslContext); - } + + NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), getListenAddress(), port, protocol, delimiter, VALID_LARGE_FILE_SIZE, messages); serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration()); serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration()); eventServer = serverFactory.getEventServer(); @@ -298,4 +293,12 @@ public class TestPutSplunk { assertNull("Unexpected extra messages found", messages.poll()); } + + private InetAddress getListenAddress() { + try { + return InetAddress.getByName(LOCALHOST); + } catch (UnknownHostException e) { + throw new IllegalArgumentException(e); + } + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index 7bb017711c..b3b6688fd0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -54,6 +54,7 @@ import javax.net.ssl.SSLContext; import java.io.IOException; import java.net.InetAddress; import java.net.NetworkInterface; +import java.net.SocketException; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; @@ -194,7 +195,6 @@ public class ListenSyslog extends AbstractSyslogProcessor { protected static final String RECEIVED_COUNTER = "Messages Received"; protected static final String SUCCESS_COUNTER = "FlowFiles Transferred to Success"; - private static final String DEFAULT_ADDRESS = "127.0.0.1"; private static final String DEFAULT_MIME_TYPE = "text/plain"; private Set relationships; @@ -282,15 +282,9 @@ public class ListenSyslog extends AbstractSyslogProcessor { parser = new SyslogParser(charset); syslogEvents = new LinkedBlockingQueue<>(maxMessageQueueSize); - String address = DEFAULT_ADDRESS; - if (StringUtils.isNotEmpty(networkInterfaceName)) { - final NetworkInterface networkInterface = NetworkInterface.getByName(networkInterfaceName); - final InetAddress interfaceAddress = networkInterface.getInetAddresses().nextElement(); - address = interfaceAddress.getHostName(); - } - + final InetAddress address = getListenAddress(networkInterfaceName); final ByteArrayMessageNettyEventServerFactory factory = new ByteArrayMessageNettyEventServerFactory(getLogger(), - address,port, protocol, messageDemarcatorBytes, receiveBufferSize, syslogEvents); + address, port, protocol, messageDemarcatorBytes, receiveBufferSize, syslogEvents); factory.setThreadNamePrefix(String.format("%s[%s]", ListenSyslog.class.getSimpleName(), getIdentifier())); final int maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue(); factory.setWorkerThreads(maxConnections); @@ -402,6 +396,15 @@ public class ListenSyslog extends AbstractSyslogProcessor { } } + private InetAddress getListenAddress(final String networkInterfaceName) throws SocketException { + InetAddress listenAddress = null; + if (StringUtils.isNotEmpty(networkInterfaceName)) { + final NetworkInterface networkInterface = NetworkInterface.getByName(networkInterfaceName); + listenAddress = networkInterface.getInetAddresses().nextElement(); + } + return listenAddress; + } + private SyslogEvent parseSyslogEvent(final ByteArrayMessage rawSyslogEvent) { final String sender = rawSyslogEvent.getSender(); final byte[] message = rawSyslogEvent.getMessage(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java index bf6ce89f16..03d15aba63 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSyslog.java @@ -31,6 +31,8 @@ import org.apache.nifi.util.TestRunners; import org.junit.Before; import org.junit.Test; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Collections; @@ -74,12 +76,15 @@ public class TestPutSyslog { private TestRunner runner; - private TransportProtocol protocol = TransportProtocol.UDP; + private final TransportProtocol protocol = TransportProtocol.UDP; + + private InetAddress address; private int port; @Before - public void setRunner() { + public void setRunner() throws UnknownHostException { + address = InetAddress.getByName(ADDRESS); port = NetworkUtils.getAvailableUdpPort(); runner = TestRunners.newTestRunner(PutSyslog.class); runner.setProperty(PutSyslog.HOSTNAME, ADDRESS); @@ -132,7 +137,7 @@ public class TestPutSyslog { private void assertSyslogMessageSuccess(final String expectedSyslogMessage, final Map attributes) throws InterruptedException { final BlockingQueue messages = new LinkedBlockingQueue<>(); final byte[] delimiter = DELIMITER.getBytes(CHARSET); - final NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), ADDRESS, port, protocol, delimiter, MAX_FRAME_LENGTH, messages); + final NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port, protocol, delimiter, MAX_FRAME_LENGTH, messages); serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration()); serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration()); final EventServer eventServer = serverFactory.getEventServer(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java index aa52f1b57c..baad1e8a97 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java @@ -39,6 +39,7 @@ import org.junit.rules.Timeout; import org.mockito.Mockito; import javax.net.ssl.SSLContext; +import java.net.InetAddress; import java.util.Arrays; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -46,6 +47,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestPutTCP { private final static String TCP_SERVER_ADDRESS = "127.0.0.1"; @@ -72,7 +74,6 @@ public class TestPutTCP { private EventServer eventServer; private int port; - private TransportProtocol PROTOCOL = TransportProtocol.TCP; private TestRunner runner; private BlockingQueue messages; @@ -107,7 +108,7 @@ public class TestPutTCP { @Test public void testRunSuccess() throws Exception { configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false); - createTestServer(TCP_SERVER_ADDRESS, port); + createTestServer(port); sendTestData(VALID_FILES); assertMessagesReceived(VALID_FILES); } @@ -126,7 +127,7 @@ public class TestPutTCP { runner.enableControllerService(sslContextService); runner.setProperty(PutTCP.SSL_CONTEXT_SERVICE, identifier); configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false); - createTestServer(TCP_SERVER_ADDRESS, port, sslContext); + createTestServer(port, sslContext); sendTestData(VALID_FILES); assertMessagesReceived(VALID_FILES); } @@ -134,7 +135,7 @@ public class TestPutTCP { @Test public void testRunSuccessServerVariableExpression() throws Exception { configureProperties(TCP_SERVER_ADDRESS_EL, OUTGOING_MESSAGE_DELIMITER, false); - createTestServer(TCP_SERVER_ADDRESS, port); + createTestServer(port); sendTestData(VALID_FILES); assertMessagesReceived(VALID_FILES); } @@ -142,7 +143,7 @@ public class TestPutTCP { @Test public void testRunSuccessPruneSenders() throws Exception { configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false); - createTestServer(TCP_SERVER_ADDRESS, port); + createTestServer(port); sendTestData(VALID_FILES); assertTransfers(VALID_FILES.length); assertMessagesReceived(VALID_FILES); @@ -158,7 +159,7 @@ public class TestPutTCP { @Test public void testRunSuccessMultiCharDelimiter() throws Exception { configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false); - createTestServer(TCP_SERVER_ADDRESS, port); + createTestServer(port); sendTestData(VALID_FILES); assertMessagesReceived(VALID_FILES); } @@ -166,7 +167,7 @@ public class TestPutTCP { @Test public void testRunSuccessConnectionPerFlowFile() throws Exception { configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true); - createTestServer(TCP_SERVER_ADDRESS, port); + createTestServer(port); sendTestData(VALID_FILES); assertMessagesReceived(VALID_FILES); } @@ -174,7 +175,7 @@ public class TestPutTCP { @Test public void testRunSuccessConnectionFailure() throws Exception { configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false); - createTestServer(TCP_SERVER_ADDRESS, port); + createTestServer(port); sendTestData(VALID_FILES); assertMessagesReceived(VALID_FILES); @@ -184,7 +185,7 @@ public class TestPutTCP { runner.assertQueueEmpty(); configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false); - createTestServer(TCP_SERVER_ADDRESS, port); + createTestServer(port); sendTestData(VALID_FILES); assertMessagesReceived(VALID_FILES); } @@ -192,7 +193,7 @@ public class TestPutTCP { @Test public void testRunSuccessEmptyFile() throws Exception { configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false); - createTestServer(TCP_SERVER_ADDRESS, port); + createTestServer(port); sendTestData(EMPTY_FILE); assertTransfers(1); runner.assertQueueEmpty(); @@ -201,7 +202,7 @@ public class TestPutTCP { @Test public void testRunSuccessLargeValidFile() throws Exception { configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true); - createTestServer(TCP_SERVER_ADDRESS, port); + createTestServer(port); final String[] testData = createContent(VALID_LARGE_FILE_SIZE); sendTestData(testData); assertMessagesReceived(testData); @@ -210,17 +211,23 @@ public class TestPutTCP { @Test public void testRunSuccessFiveHundredMessages() throws Exception { configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false); - createTestServer(TCP_SERVER_ADDRESS, port); + createTestServer(port); Thread.sleep(1000); final String[] testData = createContent(VALID_SMALL_FILE_SIZE); sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT); assertMessagesReceived(testData, LOAD_TEST_ITERATIONS); } - private void createTestServer(final String address, final int port, final SSLContext sslContext) throws Exception { + private void createTestServer(final int port) throws Exception { + createTestServer(port, null); + } + + private void createTestServer(final int port, final SSLContext sslContext) throws Exception { messages = new LinkedBlockingQueue<>(); final byte[] delimiter = getDelimiter(); - NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port, PROTOCOL, delimiter, VALID_LARGE_FILE_SIZE, messages); + final InetAddress listenAddress = InetAddress.getByName(TCP_SERVER_ADDRESS); + NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), + listenAddress, port, TransportProtocol.TCP, delimiter, VALID_LARGE_FILE_SIZE, messages); if (sslContext != null) { serverFactory.setSslContext(sslContext); } @@ -229,10 +236,6 @@ public class TestPutTCP { eventServer = serverFactory.getEventServer(); } - private void createTestServer(final String address, final int port) throws Exception { - createTestServer(address, port, null); - } - private void shutdownServer() { if (eventServer != null) { eventServer.shutdown(); @@ -280,7 +283,7 @@ public class TestPutTCP { for (String item : sentData) { final ByteArrayMessage message = messages.take(); assertNotNull(String.format("Message [%d] not found", i), message); - assert(Arrays.asList(sentData).contains(new String(message.getMessage()))); + assertTrue(Arrays.asList(sentData).contains(new String(message.getMessage()))); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java index 1232d99c13..4884e7d11f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import java.net.InetAddress; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.concurrent.BlockingQueue; @@ -37,7 +38,6 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; public class TestPutUDP { @@ -45,8 +45,6 @@ public class TestPutUDP { private final static String UDP_SERVER_ADDRESS = "127.0.0.1"; private final static String SERVER_VARIABLE = "ALKJAFLKJDFLSKJSDFLKJSDF"; private final static String UDP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}"; - private final static String UNKNOWN_HOST = "fgdsfgsdffd"; - private final static String INVALID_IP_ADDRESS = "300.300.300.300"; private static final String DELIMITER = "\n"; private static final Charset CHARSET = StandardCharsets.UTF_8; private final static int MAX_FRAME_LENGTH = 32800; @@ -64,7 +62,6 @@ public class TestPutUDP { private TestRunner runner; private int port; - private TransportProtocol PROTOCOL = TransportProtocol.UDP; private EventServer eventServer; private BlockingQueue messages; @@ -78,13 +75,15 @@ public class TestPutUDP { runner = TestRunners.newTestRunner(PutUDP.class); runner.setVariable(SERVER_VARIABLE, UDP_SERVER_ADDRESS); port = NetworkUtils.getAvailableUdpPort(); - createTestServer(UDP_SERVER_ADDRESS, port, VALID_LARGE_FILE_SIZE); + createTestServer(port, VALID_LARGE_FILE_SIZE); } - private void createTestServer(final String address, final int port, final int frameSize) throws Exception { + private void createTestServer(final int port, final int frameSize) throws Exception { messages = new LinkedBlockingQueue<>(); final byte[] delimiter = DELIMITER.getBytes(CHARSET); - NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port, PROTOCOL, delimiter, frameSize, messages); + final InetAddress listenAddress = InetAddress.getByName(UDP_SERVER_ADDRESS); + NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory( + runner.getLogger(), listenAddress, port, TransportProtocol.UDP, delimiter, frameSize, messages); serverFactory.setSocketReceiveBuffer(MAX_FRAME_LENGTH); serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration()); serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration()); @@ -92,7 +91,7 @@ public class TestPutUDP { } @After - public void cleanup() throws Exception { + public void cleanup() { runner.shutdown(); removeTestServer(); } @@ -106,7 +105,7 @@ public class TestPutUDP { @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) public void testValidFiles() throws Exception { - configureProperties(UDP_SERVER_ADDRESS, true); + configureProperties(UDP_SERVER_ADDRESS); sendTestData(VALID_FILES); checkReceivedAllData(VALID_FILES); checkInputQueueIsEmpty(); @@ -114,7 +113,7 @@ public class TestPutUDP { @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) public void testValidFilesEL() throws Exception { - configureProperties(UDP_SERVER_ADDRESS_EL, true); + configureProperties(UDP_SERVER_ADDRESS_EL); sendTestData(VALID_FILES); checkReceivedAllData(VALID_FILES); checkInputQueueIsEmpty(); @@ -122,7 +121,7 @@ public class TestPutUDP { @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) public void testEmptyFile() throws Exception { - configureProperties(UDP_SERVER_ADDRESS, true); + configureProperties(UDP_SERVER_ADDRESS); sendTestData(EMPTY_FILE); checkRelationships(EMPTY_FILE.length, 0); checkNoDataReceived(); @@ -131,7 +130,7 @@ public class TestPutUDP { @Test(timeout = LONG_TEST_TIMEOUT_PERIOD) public void testLargeValidFile() throws Exception { - configureProperties(UDP_SERVER_ADDRESS, true); + configureProperties(UDP_SERVER_ADDRESS); final String[] testData = createContent(VALID_LARGE_FILE_SIZE); sendTestData(testData); checkReceivedAllData(testData); @@ -140,7 +139,7 @@ public class TestPutUDP { @Test(timeout = LONG_TEST_TIMEOUT_PERIOD) public void testLargeInvalidFile() throws Exception { - configureProperties(UDP_SERVER_ADDRESS, true); + configureProperties(UDP_SERVER_ADDRESS); String[] testData = createContent(INVALID_LARGE_FILE_SIZE); sendTestData(testData); checkRelationships(0, testData.length); @@ -148,37 +147,17 @@ public class TestPutUDP { checkInputQueueIsEmpty(); } - @Ignore("This test is failing intermittently as documented in NIFI-4288") - @Test(timeout = LONG_TEST_TIMEOUT_PERIOD) - public void testInvalidIPAddress() throws Exception { - configureProperties(INVALID_IP_ADDRESS, true); - sendTestData(VALID_FILES); - checkNoDataReceived(); - checkRelationships(0, VALID_FILES.length); - checkInputQueueIsEmpty(); - } - - @Ignore("This test is failing incorrectly as documented in NIFI-1795") - @Test(timeout = LONG_TEST_TIMEOUT_PERIOD) - public void testUnknownHostname() throws Exception { - configureProperties(UNKNOWN_HOST, true); - sendTestData(VALID_FILES); - checkNoDataReceived(); - checkRelationships(0, VALID_FILES.length); - checkInputQueueIsEmpty(); - } - @Test(timeout = LONG_TEST_TIMEOUT_PERIOD) public void testReconfiguration() throws Exception { - configureProperties(UDP_SERVER_ADDRESS, true); + configureProperties(UDP_SERVER_ADDRESS); sendTestData(VALID_FILES); checkReceivedAllData(VALID_FILES); - reset(UDP_SERVER_ADDRESS, port, MAX_FRAME_LENGTH); - configureProperties(UDP_SERVER_ADDRESS, true); + reset(port); + configureProperties(UDP_SERVER_ADDRESS); sendTestData(VALID_FILES); checkReceivedAllData(VALID_FILES); - reset(UDP_SERVER_ADDRESS, port, MAX_FRAME_LENGTH); - configureProperties(UDP_SERVER_ADDRESS, true); + reset(port); + configureProperties(UDP_SERVER_ADDRESS); sendTestData(VALID_FILES); checkReceivedAllData(VALID_FILES); checkInputQueueIsEmpty(); @@ -187,28 +166,23 @@ public class TestPutUDP { @Test(timeout = LONG_TEST_TIMEOUT_PERIOD) public void testLoadTest() throws Exception { final String[] testData = createContent(VALID_SMALL_FILE_SIZE); - configureProperties(UDP_SERVER_ADDRESS, true); + configureProperties(UDP_SERVER_ADDRESS); sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT); checkReceivedAllData(testData, LOAD_TEST_ITERATIONS); checkInputQueueIsEmpty(); } - private void reset(final String address, final int port, final int frameSize) throws Exception { + private void reset(final int port) throws Exception { runner.clearTransferState(); removeTestServer(); - createTestServer(address, port, frameSize); + createTestServer(port, MAX_FRAME_LENGTH); } - private void configureProperties(final String host, final boolean expectValid) { + private void configureProperties(final String host) { runner.setProperty(PutUDP.HOSTNAME, host); runner.setProperty(PutUDP.PORT, Integer.toString(port)); runner.setProperty(PutUDP.MAX_SOCKET_SEND_BUFFER_SIZE, "40000B"); - - if (expectValid) { - runner.assertValid(); - } else { - runner.assertNotValid(); - } + runner.assertValid(); } private void sendTestData(final String[] testData) throws InterruptedException {