mirror of https://github.com/apache/nifi.git
NIFI-9223 Corrected ListenSyslog with default address of 0.0.0.0
- Refactored NettyEventServerFactory to accept nullable InetAddress - Updated unit tests referencing NettyEventServerFactory Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #5426.
This commit is contained in:
parent
e16a6c2b89
commit
4943560521
|
@ -27,6 +27,7 @@ import org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
|
|||
import org.apache.nifi.event.transport.netty.codec.SocketByteArrayMessageDecoder;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
|
@ -40,15 +41,15 @@ public class ByteArrayMessageNettyEventServerFactory extends NettyEventServerFac
|
|||
* Netty Event Server Factory with configurable delimiter and queue of Byte Array Messages
|
||||
*
|
||||
* @param log Component Log
|
||||
* @param address Remote Address
|
||||
* @param port Remote Port Number
|
||||
* @param address Listen Address
|
||||
* @param port Listen Port Number
|
||||
* @param protocol Channel Protocol
|
||||
* @param delimiter Message Delimiter
|
||||
* @param maxFrameLength Maximum Frame Length for delimited TCP messages
|
||||
* @param messages Blocking Queue for events received
|
||||
*/
|
||||
public ByteArrayMessageNettyEventServerFactory(final ComponentLog log,
|
||||
final String address,
|
||||
final InetAddress address,
|
||||
final int port,
|
||||
final TransportProtocol protocol,
|
||||
final byte[] delimiter,
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.nifi.event.transport.netty.channel.ssl.ServerSslHandlerChannel
|
|||
import org.apache.nifi.security.util.ClientAuth;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.net.InetAddress;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -47,7 +48,7 @@ import java.util.function.Supplier;
|
|||
* Netty Event Server Factory
|
||||
*/
|
||||
public class NettyEventServerFactory extends EventLoopGroupFactory implements EventServerFactory {
|
||||
private final String address;
|
||||
private final InetAddress address;
|
||||
|
||||
private final int port;
|
||||
|
||||
|
@ -65,7 +66,7 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
|
|||
|
||||
private Duration shutdownTimeout = ShutdownTimeout.DEFAULT.getDuration();
|
||||
|
||||
public NettyEventServerFactory(final String address, final int port, final TransportProtocol protocol) {
|
||||
public NettyEventServerFactory(final InetAddress address, final int port, final TransportProtocol protocol) {
|
||||
this.address = address;
|
||||
this.port = port;
|
||||
this.protocol = protocol;
|
||||
|
|
|
@ -36,6 +36,8 @@ import org.mockito.Mock;
|
|||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.GeneralSecurityException;
|
||||
|
@ -50,7 +52,7 @@ import static org.junit.Assert.assertThrows;
|
|||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class StringNettyEventSenderFactoryTest {
|
||||
private static final String ADDRESS = "127.0.0.1";
|
||||
private static final InetAddress ADDRESS;
|
||||
|
||||
private static final int MAX_FRAME_LENGTH = 1024;
|
||||
|
||||
|
@ -66,6 +68,14 @@ public class StringNettyEventSenderFactoryTest {
|
|||
|
||||
private static final int SINGLE_THREAD = 1;
|
||||
|
||||
static {
|
||||
try {
|
||||
ADDRESS = InetAddress.getByName("127.0.0.1");
|
||||
} catch (final UnknownHostException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Mock
|
||||
private ComponentLog log;
|
||||
|
||||
|
@ -130,12 +140,12 @@ public class StringNettyEventSenderFactoryTest {
|
|||
assertNotNull("Message not received", messageReceived);
|
||||
final String eventReceived = new String(messageReceived.getMessage(), CHARSET);
|
||||
assertEquals("Message not matched", MESSAGE, eventReceived);
|
||||
assertEquals("Sender not matched", ADDRESS, messageReceived.getSender());
|
||||
assertEquals("Sender not matched", ADDRESS.getHostAddress(), messageReceived.getSender());
|
||||
}
|
||||
|
||||
private NettyEventSenderFactory<String> getEventSenderFactory(final int port) {
|
||||
final StringNettyEventSenderFactory senderFactory = new StringNettyEventSenderFactory(log,
|
||||
ADDRESS, port, TransportProtocol.TCP, CHARSET, LineEnding.UNIX);
|
||||
ADDRESS.getHostAddress(), port, TransportProtocol.TCP, CHARSET, LineEnding.UNIX);
|
||||
senderFactory.setTimeout(DEFAULT_TIMEOUT);
|
||||
senderFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
|
||||
senderFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
|
||||
|
|
|
@ -31,7 +31,8 @@ import org.junit.After;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
|
@ -56,7 +57,7 @@ public class TestPutSplunk {
|
|||
private static final String LOCALHOST = "localhost";
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
public void setup() {
|
||||
runner = TestRunners.newTestRunner(PutSplunk.class);
|
||||
}
|
||||
|
||||
|
@ -251,7 +252,7 @@ public class TestPutSplunk {
|
|||
}
|
||||
|
||||
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
|
||||
public void testUnableToCreateConnectionShouldRouteToFailure() throws InterruptedException {
|
||||
public void testUnableToCreateConnectionShouldRouteToFailure() {
|
||||
// Set an unreachable port
|
||||
runner.setProperty(PutSplunk.PORT, String.valueOf(NetworkUtils.getAvailableUdpPort()));
|
||||
|
||||
|
@ -263,26 +264,20 @@ public class TestPutSplunk {
|
|||
}
|
||||
|
||||
private void createTestServer(final TransportProtocol protocol) {
|
||||
createTestServer(LOCALHOST, protocol, null);
|
||||
}
|
||||
|
||||
private void createTestServer(final String address, final TransportProtocol protocol, final SSLContext sslContext) {
|
||||
if (protocol == TransportProtocol.UDP) {
|
||||
createTestServer(address, NetworkUtils.getAvailableUdpPort(), protocol, sslContext);
|
||||
createTestServer(NetworkUtils.getAvailableUdpPort(), protocol);
|
||||
} else {
|
||||
createTestServer(address, NetworkUtils.getAvailableTcpPort(), protocol, sslContext);
|
||||
createTestServer(NetworkUtils.getAvailableTcpPort(), protocol);
|
||||
}
|
||||
}
|
||||
|
||||
private void createTestServer(final String address, final int port, final TransportProtocol protocol, final SSLContext sslContext) {
|
||||
private void createTestServer(final int port, final TransportProtocol protocol) {
|
||||
messages = new LinkedBlockingQueue<>();
|
||||
runner.setProperty(PutSplunk.PROTOCOL, protocol.name());
|
||||
runner.setProperty(PutSplunk.PORT, String.valueOf(port));
|
||||
final byte[] delimiter = OUTGOING_MESSAGE_DELIMITER.getBytes(CHARSET);
|
||||
NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port, protocol, delimiter, VALID_LARGE_FILE_SIZE, messages);
|
||||
if (sslContext != null) {
|
||||
serverFactory.setSslContext(sslContext);
|
||||
}
|
||||
|
||||
NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), getListenAddress(), port, protocol, delimiter, VALID_LARGE_FILE_SIZE, messages);
|
||||
serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
|
||||
serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
|
||||
eventServer = serverFactory.getEventServer();
|
||||
|
@ -298,4 +293,12 @@ public class TestPutSplunk {
|
|||
|
||||
assertNull("Unexpected extra messages found", messages.poll());
|
||||
}
|
||||
|
||||
private InetAddress getListenAddress() {
|
||||
try {
|
||||
return InetAddress.getByName(LOCALHOST);
|
||||
} catch (UnknownHostException e) {
|
||||
throw new IllegalArgumentException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,6 +54,7 @@ import javax.net.ssl.SSLContext;
|
|||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.NetworkInterface;
|
||||
import java.net.SocketException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -194,7 +195,6 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
|
||||
protected static final String RECEIVED_COUNTER = "Messages Received";
|
||||
protected static final String SUCCESS_COUNTER = "FlowFiles Transferred to Success";
|
||||
private static final String DEFAULT_ADDRESS = "127.0.0.1";
|
||||
private static final String DEFAULT_MIME_TYPE = "text/plain";
|
||||
|
||||
private Set<Relationship> relationships;
|
||||
|
@ -282,15 +282,9 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
parser = new SyslogParser(charset);
|
||||
syslogEvents = new LinkedBlockingQueue<>(maxMessageQueueSize);
|
||||
|
||||
String address = DEFAULT_ADDRESS;
|
||||
if (StringUtils.isNotEmpty(networkInterfaceName)) {
|
||||
final NetworkInterface networkInterface = NetworkInterface.getByName(networkInterfaceName);
|
||||
final InetAddress interfaceAddress = networkInterface.getInetAddresses().nextElement();
|
||||
address = interfaceAddress.getHostName();
|
||||
}
|
||||
|
||||
final InetAddress address = getListenAddress(networkInterfaceName);
|
||||
final ByteArrayMessageNettyEventServerFactory factory = new ByteArrayMessageNettyEventServerFactory(getLogger(),
|
||||
address,port, protocol, messageDemarcatorBytes, receiveBufferSize, syslogEvents);
|
||||
address, port, protocol, messageDemarcatorBytes, receiveBufferSize, syslogEvents);
|
||||
factory.setThreadNamePrefix(String.format("%s[%s]", ListenSyslog.class.getSimpleName(), getIdentifier()));
|
||||
final int maxConnections = context.getProperty(MAX_CONNECTIONS).asLong().intValue();
|
||||
factory.setWorkerThreads(maxConnections);
|
||||
|
@ -402,6 +396,15 @@ public class ListenSyslog extends AbstractSyslogProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
private InetAddress getListenAddress(final String networkInterfaceName) throws SocketException {
|
||||
InetAddress listenAddress = null;
|
||||
if (StringUtils.isNotEmpty(networkInterfaceName)) {
|
||||
final NetworkInterface networkInterface = NetworkInterface.getByName(networkInterfaceName);
|
||||
listenAddress = networkInterface.getInetAddresses().nextElement();
|
||||
}
|
||||
return listenAddress;
|
||||
}
|
||||
|
||||
private SyslogEvent parseSyslogEvent(final ByteArrayMessage rawSyslogEvent) {
|
||||
final String sender = rawSyslogEvent.getSender();
|
||||
final byte[] message = rawSyslogEvent.getMessage();
|
||||
|
|
|
@ -31,6 +31,8 @@ import org.apache.nifi.util.TestRunners;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
|
@ -74,12 +76,15 @@ public class TestPutSyslog {
|
|||
|
||||
private TestRunner runner;
|
||||
|
||||
private TransportProtocol protocol = TransportProtocol.UDP;
|
||||
private final TransportProtocol protocol = TransportProtocol.UDP;
|
||||
|
||||
private InetAddress address;
|
||||
|
||||
private int port;
|
||||
|
||||
@Before
|
||||
public void setRunner() {
|
||||
public void setRunner() throws UnknownHostException {
|
||||
address = InetAddress.getByName(ADDRESS);
|
||||
port = NetworkUtils.getAvailableUdpPort();
|
||||
runner = TestRunners.newTestRunner(PutSyslog.class);
|
||||
runner.setProperty(PutSyslog.HOSTNAME, ADDRESS);
|
||||
|
@ -132,7 +137,7 @@ public class TestPutSyslog {
|
|||
private void assertSyslogMessageSuccess(final String expectedSyslogMessage, final Map<String, String> attributes) throws InterruptedException {
|
||||
final BlockingQueue<ByteArrayMessage> messages = new LinkedBlockingQueue<>();
|
||||
final byte[] delimiter = DELIMITER.getBytes(CHARSET);
|
||||
final NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), ADDRESS, port, protocol, delimiter, MAX_FRAME_LENGTH, messages);
|
||||
final NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port, protocol, delimiter, MAX_FRAME_LENGTH, messages);
|
||||
serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
|
||||
serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
|
||||
final EventServer eventServer = serverFactory.getEventServer();
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.junit.rules.Timeout;
|
|||
import org.mockito.Mockito;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.net.InetAddress;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
@ -46,6 +47,7 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestPutTCP {
|
||||
private final static String TCP_SERVER_ADDRESS = "127.0.0.1";
|
||||
|
@ -72,7 +74,6 @@ public class TestPutTCP {
|
|||
|
||||
private EventServer eventServer;
|
||||
private int port;
|
||||
private TransportProtocol PROTOCOL = TransportProtocol.TCP;
|
||||
private TestRunner runner;
|
||||
private BlockingQueue<ByteArrayMessage> messages;
|
||||
|
||||
|
@ -107,7 +108,7 @@ public class TestPutTCP {
|
|||
@Test
|
||||
public void testRunSuccess() throws Exception {
|
||||
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
|
||||
createTestServer(TCP_SERVER_ADDRESS, port);
|
||||
createTestServer(port);
|
||||
sendTestData(VALID_FILES);
|
||||
assertMessagesReceived(VALID_FILES);
|
||||
}
|
||||
|
@ -126,7 +127,7 @@ public class TestPutTCP {
|
|||
runner.enableControllerService(sslContextService);
|
||||
runner.setProperty(PutTCP.SSL_CONTEXT_SERVICE, identifier);
|
||||
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
|
||||
createTestServer(TCP_SERVER_ADDRESS, port, sslContext);
|
||||
createTestServer(port, sslContext);
|
||||
sendTestData(VALID_FILES);
|
||||
assertMessagesReceived(VALID_FILES);
|
||||
}
|
||||
|
@ -134,7 +135,7 @@ public class TestPutTCP {
|
|||
@Test
|
||||
public void testRunSuccessServerVariableExpression() throws Exception {
|
||||
configureProperties(TCP_SERVER_ADDRESS_EL, OUTGOING_MESSAGE_DELIMITER, false);
|
||||
createTestServer(TCP_SERVER_ADDRESS, port);
|
||||
createTestServer(port);
|
||||
sendTestData(VALID_FILES);
|
||||
assertMessagesReceived(VALID_FILES);
|
||||
}
|
||||
|
@ -142,7 +143,7 @@ public class TestPutTCP {
|
|||
@Test
|
||||
public void testRunSuccessPruneSenders() throws Exception {
|
||||
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
|
||||
createTestServer(TCP_SERVER_ADDRESS, port);
|
||||
createTestServer(port);
|
||||
sendTestData(VALID_FILES);
|
||||
assertTransfers(VALID_FILES.length);
|
||||
assertMessagesReceived(VALID_FILES);
|
||||
|
@ -158,7 +159,7 @@ public class TestPutTCP {
|
|||
@Test
|
||||
public void testRunSuccessMultiCharDelimiter() throws Exception {
|
||||
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false);
|
||||
createTestServer(TCP_SERVER_ADDRESS, port);
|
||||
createTestServer(port);
|
||||
sendTestData(VALID_FILES);
|
||||
assertMessagesReceived(VALID_FILES);
|
||||
}
|
||||
|
@ -166,7 +167,7 @@ public class TestPutTCP {
|
|||
@Test
|
||||
public void testRunSuccessConnectionPerFlowFile() throws Exception {
|
||||
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
|
||||
createTestServer(TCP_SERVER_ADDRESS, port);
|
||||
createTestServer(port);
|
||||
sendTestData(VALID_FILES);
|
||||
assertMessagesReceived(VALID_FILES);
|
||||
}
|
||||
|
@ -174,7 +175,7 @@ public class TestPutTCP {
|
|||
@Test
|
||||
public void testRunSuccessConnectionFailure() throws Exception {
|
||||
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
|
||||
createTestServer(TCP_SERVER_ADDRESS, port);
|
||||
createTestServer(port);
|
||||
sendTestData(VALID_FILES);
|
||||
assertMessagesReceived(VALID_FILES);
|
||||
|
||||
|
@ -184,7 +185,7 @@ public class TestPutTCP {
|
|||
runner.assertQueueEmpty();
|
||||
|
||||
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
|
||||
createTestServer(TCP_SERVER_ADDRESS, port);
|
||||
createTestServer(port);
|
||||
sendTestData(VALID_FILES);
|
||||
assertMessagesReceived(VALID_FILES);
|
||||
}
|
||||
|
@ -192,7 +193,7 @@ public class TestPutTCP {
|
|||
@Test
|
||||
public void testRunSuccessEmptyFile() throws Exception {
|
||||
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
|
||||
createTestServer(TCP_SERVER_ADDRESS, port);
|
||||
createTestServer(port);
|
||||
sendTestData(EMPTY_FILE);
|
||||
assertTransfers(1);
|
||||
runner.assertQueueEmpty();
|
||||
|
@ -201,7 +202,7 @@ public class TestPutTCP {
|
|||
@Test
|
||||
public void testRunSuccessLargeValidFile() throws Exception {
|
||||
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
|
||||
createTestServer(TCP_SERVER_ADDRESS, port);
|
||||
createTestServer(port);
|
||||
final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
|
||||
sendTestData(testData);
|
||||
assertMessagesReceived(testData);
|
||||
|
@ -210,17 +211,23 @@ public class TestPutTCP {
|
|||
@Test
|
||||
public void testRunSuccessFiveHundredMessages() throws Exception {
|
||||
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
|
||||
createTestServer(TCP_SERVER_ADDRESS, port);
|
||||
createTestServer(port);
|
||||
Thread.sleep(1000);
|
||||
final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
|
||||
sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
|
||||
assertMessagesReceived(testData, LOAD_TEST_ITERATIONS);
|
||||
}
|
||||
|
||||
private void createTestServer(final String address, final int port, final SSLContext sslContext) throws Exception {
|
||||
private void createTestServer(final int port) throws Exception {
|
||||
createTestServer(port, null);
|
||||
}
|
||||
|
||||
private void createTestServer(final int port, final SSLContext sslContext) throws Exception {
|
||||
messages = new LinkedBlockingQueue<>();
|
||||
final byte[] delimiter = getDelimiter();
|
||||
NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port, PROTOCOL, delimiter, VALID_LARGE_FILE_SIZE, messages);
|
||||
final InetAddress listenAddress = InetAddress.getByName(TCP_SERVER_ADDRESS);
|
||||
NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(),
|
||||
listenAddress, port, TransportProtocol.TCP, delimiter, VALID_LARGE_FILE_SIZE, messages);
|
||||
if (sslContext != null) {
|
||||
serverFactory.setSslContext(sslContext);
|
||||
}
|
||||
|
@ -229,10 +236,6 @@ public class TestPutTCP {
|
|||
eventServer = serverFactory.getEventServer();
|
||||
}
|
||||
|
||||
private void createTestServer(final String address, final int port) throws Exception {
|
||||
createTestServer(address, port, null);
|
||||
}
|
||||
|
||||
private void shutdownServer() {
|
||||
if (eventServer != null) {
|
||||
eventServer.shutdown();
|
||||
|
@ -280,7 +283,7 @@ public class TestPutTCP {
|
|||
for (String item : sentData) {
|
||||
final ByteArrayMessage message = messages.take();
|
||||
assertNotNull(String.format("Message [%d] not found", i), message);
|
||||
assert(Arrays.asList(sentData).contains(new String(message.getMessage())));
|
||||
assertTrue(Arrays.asList(sentData).contains(new String(message.getMessage())));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import static org.junit.Assert.assertArrayEquals;
|
|||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
@ -37,7 +38,6 @@ import org.apache.nifi.util.TestRunner;
|
|||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestPutUDP {
|
||||
|
@ -45,8 +45,6 @@ public class TestPutUDP {
|
|||
private final static String UDP_SERVER_ADDRESS = "127.0.0.1";
|
||||
private final static String SERVER_VARIABLE = "ALKJAFLKJDFLSKJSDFLKJSDF";
|
||||
private final static String UDP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}";
|
||||
private final static String UNKNOWN_HOST = "fgdsfgsdffd";
|
||||
private final static String INVALID_IP_ADDRESS = "300.300.300.300";
|
||||
private static final String DELIMITER = "\n";
|
||||
private static final Charset CHARSET = StandardCharsets.UTF_8;
|
||||
private final static int MAX_FRAME_LENGTH = 32800;
|
||||
|
@ -64,7 +62,6 @@ public class TestPutUDP {
|
|||
|
||||
private TestRunner runner;
|
||||
private int port;
|
||||
private TransportProtocol PROTOCOL = TransportProtocol.UDP;
|
||||
private EventServer eventServer;
|
||||
private BlockingQueue<ByteArrayMessage> messages;
|
||||
|
||||
|
@ -78,13 +75,15 @@ public class TestPutUDP {
|
|||
runner = TestRunners.newTestRunner(PutUDP.class);
|
||||
runner.setVariable(SERVER_VARIABLE, UDP_SERVER_ADDRESS);
|
||||
port = NetworkUtils.getAvailableUdpPort();
|
||||
createTestServer(UDP_SERVER_ADDRESS, port, VALID_LARGE_FILE_SIZE);
|
||||
createTestServer(port, VALID_LARGE_FILE_SIZE);
|
||||
}
|
||||
|
||||
private void createTestServer(final String address, final int port, final int frameSize) throws Exception {
|
||||
private void createTestServer(final int port, final int frameSize) throws Exception {
|
||||
messages = new LinkedBlockingQueue<>();
|
||||
final byte[] delimiter = DELIMITER.getBytes(CHARSET);
|
||||
NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port, PROTOCOL, delimiter, frameSize, messages);
|
||||
final InetAddress listenAddress = InetAddress.getByName(UDP_SERVER_ADDRESS);
|
||||
NettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory(
|
||||
runner.getLogger(), listenAddress, port, TransportProtocol.UDP, delimiter, frameSize, messages);
|
||||
serverFactory.setSocketReceiveBuffer(MAX_FRAME_LENGTH);
|
||||
serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
|
||||
serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
|
||||
|
@ -92,7 +91,7 @@ public class TestPutUDP {
|
|||
}
|
||||
|
||||
@After
|
||||
public void cleanup() throws Exception {
|
||||
public void cleanup() {
|
||||
runner.shutdown();
|
||||
removeTestServer();
|
||||
}
|
||||
|
@ -106,7 +105,7 @@ public class TestPutUDP {
|
|||
|
||||
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
|
||||
public void testValidFiles() throws Exception {
|
||||
configureProperties(UDP_SERVER_ADDRESS, true);
|
||||
configureProperties(UDP_SERVER_ADDRESS);
|
||||
sendTestData(VALID_FILES);
|
||||
checkReceivedAllData(VALID_FILES);
|
||||
checkInputQueueIsEmpty();
|
||||
|
@ -114,7 +113,7 @@ public class TestPutUDP {
|
|||
|
||||
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
|
||||
public void testValidFilesEL() throws Exception {
|
||||
configureProperties(UDP_SERVER_ADDRESS_EL, true);
|
||||
configureProperties(UDP_SERVER_ADDRESS_EL);
|
||||
sendTestData(VALID_FILES);
|
||||
checkReceivedAllData(VALID_FILES);
|
||||
checkInputQueueIsEmpty();
|
||||
|
@ -122,7 +121,7 @@ public class TestPutUDP {
|
|||
|
||||
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
|
||||
public void testEmptyFile() throws Exception {
|
||||
configureProperties(UDP_SERVER_ADDRESS, true);
|
||||
configureProperties(UDP_SERVER_ADDRESS);
|
||||
sendTestData(EMPTY_FILE);
|
||||
checkRelationships(EMPTY_FILE.length, 0);
|
||||
checkNoDataReceived();
|
||||
|
@ -131,7 +130,7 @@ public class TestPutUDP {
|
|||
|
||||
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
|
||||
public void testLargeValidFile() throws Exception {
|
||||
configureProperties(UDP_SERVER_ADDRESS, true);
|
||||
configureProperties(UDP_SERVER_ADDRESS);
|
||||
final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
|
||||
sendTestData(testData);
|
||||
checkReceivedAllData(testData);
|
||||
|
@ -140,7 +139,7 @@ public class TestPutUDP {
|
|||
|
||||
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
|
||||
public void testLargeInvalidFile() throws Exception {
|
||||
configureProperties(UDP_SERVER_ADDRESS, true);
|
||||
configureProperties(UDP_SERVER_ADDRESS);
|
||||
String[] testData = createContent(INVALID_LARGE_FILE_SIZE);
|
||||
sendTestData(testData);
|
||||
checkRelationships(0, testData.length);
|
||||
|
@ -148,37 +147,17 @@ public class TestPutUDP {
|
|||
checkInputQueueIsEmpty();
|
||||
}
|
||||
|
||||
@Ignore("This test is failing intermittently as documented in NIFI-4288")
|
||||
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
|
||||
public void testInvalidIPAddress() throws Exception {
|
||||
configureProperties(INVALID_IP_ADDRESS, true);
|
||||
sendTestData(VALID_FILES);
|
||||
checkNoDataReceived();
|
||||
checkRelationships(0, VALID_FILES.length);
|
||||
checkInputQueueIsEmpty();
|
||||
}
|
||||
|
||||
@Ignore("This test is failing incorrectly as documented in NIFI-1795")
|
||||
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
|
||||
public void testUnknownHostname() throws Exception {
|
||||
configureProperties(UNKNOWN_HOST, true);
|
||||
sendTestData(VALID_FILES);
|
||||
checkNoDataReceived();
|
||||
checkRelationships(0, VALID_FILES.length);
|
||||
checkInputQueueIsEmpty();
|
||||
}
|
||||
|
||||
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
|
||||
public void testReconfiguration() throws Exception {
|
||||
configureProperties(UDP_SERVER_ADDRESS, true);
|
||||
configureProperties(UDP_SERVER_ADDRESS);
|
||||
sendTestData(VALID_FILES);
|
||||
checkReceivedAllData(VALID_FILES);
|
||||
reset(UDP_SERVER_ADDRESS, port, MAX_FRAME_LENGTH);
|
||||
configureProperties(UDP_SERVER_ADDRESS, true);
|
||||
reset(port);
|
||||
configureProperties(UDP_SERVER_ADDRESS);
|
||||
sendTestData(VALID_FILES);
|
||||
checkReceivedAllData(VALID_FILES);
|
||||
reset(UDP_SERVER_ADDRESS, port, MAX_FRAME_LENGTH);
|
||||
configureProperties(UDP_SERVER_ADDRESS, true);
|
||||
reset(port);
|
||||
configureProperties(UDP_SERVER_ADDRESS);
|
||||
sendTestData(VALID_FILES);
|
||||
checkReceivedAllData(VALID_FILES);
|
||||
checkInputQueueIsEmpty();
|
||||
|
@ -187,28 +166,23 @@ public class TestPutUDP {
|
|||
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
|
||||
public void testLoadTest() throws Exception {
|
||||
final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
|
||||
configureProperties(UDP_SERVER_ADDRESS, true);
|
||||
configureProperties(UDP_SERVER_ADDRESS);
|
||||
sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
|
||||
checkReceivedAllData(testData, LOAD_TEST_ITERATIONS);
|
||||
checkInputQueueIsEmpty();
|
||||
}
|
||||
|
||||
private void reset(final String address, final int port, final int frameSize) throws Exception {
|
||||
private void reset(final int port) throws Exception {
|
||||
runner.clearTransferState();
|
||||
removeTestServer();
|
||||
createTestServer(address, port, frameSize);
|
||||
createTestServer(port, MAX_FRAME_LENGTH);
|
||||
}
|
||||
|
||||
private void configureProperties(final String host, final boolean expectValid) {
|
||||
private void configureProperties(final String host) {
|
||||
runner.setProperty(PutUDP.HOSTNAME, host);
|
||||
runner.setProperty(PutUDP.PORT, Integer.toString(port));
|
||||
runner.setProperty(PutUDP.MAX_SOCKET_SEND_BUFFER_SIZE, "40000B");
|
||||
|
||||
if (expectValid) {
|
||||
runner.assertValid();
|
||||
} else {
|
||||
runner.assertNotValid();
|
||||
}
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
private void sendTestData(final String[] testData) throws InterruptedException {
|
||||
|
|
Loading…
Reference in New Issue