NIFI-8789 Corrected TestListenUDP to use getAvailableUdpPort()

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5219.
This commit is contained in:
exceptionfactory 2021-07-15 20:31:02 -05:00 committed by Pierre Villard
parent e4ff6f95a0
commit 00f385b51b
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
1 changed files with 31 additions and 35 deletions

View File

@ -31,17 +31,22 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.channels.DatagramChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@RunWith(MockitoJUnitRunner.class)
public class TestListenUDP {
private static final String LOCALHOST = "localhost";
@ -50,10 +55,13 @@ public class TestListenUDP {
private TestRunner runner;
@Mock
private ChannelResponder<DatagramChannel> responder;
@Before
public void setUp() throws Exception {
runner = TestRunners.newTestRunner(ListenUDP.class);
port = NetworkUtils.availablePort();
port = NetworkUtils.getAvailableUdpPort();
runner.setProperty(ListenUDP.PORT, Integer.toString(port));
}
@ -62,7 +70,7 @@ public class TestListenUDP {
runner.setProperty(ListenUDP.PORT, "1");
runner.assertValid();
runner.setProperty(ListenUDP.SENDING_HOST, "localhost");
runner.setProperty(ListenUDP.SENDING_HOST, LOCALHOST);
runner.assertNotValid();
runner.setProperty(ListenUDP.SENDING_HOST_PORT, "1234");
@ -75,12 +83,9 @@ public class TestListenUDP {
@Test
public void testDefaultBehavior() throws IOException, InterruptedException {
final List<String> messages = getMessages(15);
final int expectedQueued = messages.size();
final int expectedTransferred = messages.size();
// default behavior should produce a FlowFile per message sent
run(new DatagramSocket(), messages, expectedQueued, expectedTransferred);
run(new DatagramSocket(), messages, expectedTransferred);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, messages.size());
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@ -95,7 +100,7 @@ public class TestListenUDP {
final List<String> messages = getMessages(20);
run(new DatagramSocket(), messages, maxQueueSize, maxQueueSize);
run(new DatagramSocket(), messages, maxQueueSize);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, maxQueueSize);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@ -110,10 +115,9 @@ public class TestListenUDP {
runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "3");
final List<String> messages = getMessages(5);
final int expectedQueued = messages.size();
final int expectedTransferred = 2;
run(new DatagramSocket(), messages, expectedQueued, expectedTransferred);
run(new DatagramSocket(), messages, expectedTransferred);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, expectedTransferred);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@ -131,55 +135,52 @@ public class TestListenUDP {
public void testBatchingWithDifferentSenders() {
final String sender1 = "sender1";
final String sender2 = "sender2";
final ChannelResponder responder = Mockito.mock(ChannelResponder.class);
final byte[] message = "test message".getBytes(StandardCharsets.UTF_8);
final List<StandardEvent> mockEvents = new ArrayList<>();
mockEvents.add(new StandardEvent(sender1, message, responder));
mockEvents.add(new StandardEvent(sender1, message, responder));
mockEvents.add(new StandardEvent(sender2, message, responder));
mockEvents.add(new StandardEvent(sender2, message, responder));
final List<StandardEvent<DatagramChannel>> mockEvents = new ArrayList<>();
mockEvents.add(new StandardEvent<>(sender1, message, responder));
mockEvents.add(new StandardEvent<>(sender1, message, responder));
mockEvents.add(new StandardEvent<>(sender2, message, responder));
mockEvents.add(new StandardEvent<>(sender2, message, responder));
MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
runner = TestRunners.newTestRunner(mockListenUDP);
runner.setProperty(ListenRELP.PORT, "1");
runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
runner.setProperty(ListenUDP.PORT, "1");
runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "10");
// sending 4 messages with a batch size of 10, but should get 2 FlowFiles because of different senders
runner.run();
runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 2);
verifyProvenance(2);
}
@Test
public void testRunWhenNoEventsAvailable() {
final List<StandardEvent> mockEvents = new ArrayList<>();
final List<StandardEvent<DatagramChannel>> mockEvents = new ArrayList<>();
MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
runner = TestRunners.newTestRunner(mockListenUDP);
runner.setProperty(ListenRELP.PORT, "1");
runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
runner.setProperty(ListenUDP.PORT, "1");
runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "10");
runner.run(5);
runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 0);
}
@Test
public void testWithSendingHostAndPortSameAsSender() throws IOException, InterruptedException {
final String sendingHost = "localhost";
final Integer sendingPort = 21001;
runner.setProperty(ListenUDP.SENDING_HOST, sendingHost);
final Integer sendingPort = NetworkUtils.getAvailableUdpPort();
runner.setProperty(ListenUDP.SENDING_HOST, LOCALHOST);
runner.setProperty(ListenUDP.SENDING_HOST_PORT, String.valueOf(sendingPort));
// bind to the same sending port that processor has for Sending Host Port
final DatagramSocket socket = new DatagramSocket(sendingPort);
final List<String> messages = getMessages(6);
final int expectedQueued = messages.size();
final int expectedTransferred = messages.size();
run(socket, messages, expectedQueued, expectedTransferred);
run(socket, messages, expectedTransferred);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, messages.size());
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
@ -214,11 +215,8 @@ public class TestListenUDP {
}
}
protected void run(final DatagramSocket socket, final List<String> messages, final int expectedQueueSize, final int expectedTransferred)
protected void run(final DatagramSocket socket, final List<String> messages, final int expectedTransferred)
throws IOException, InterruptedException {
// Run Processor and start Dispatcher without shutting down
runner.run(1, false, true);
@ -242,9 +240,9 @@ public class TestListenUDP {
// Extend ListenUDP to mock the ChannelDispatcher and allow us to return staged events
private static class MockListenUDP extends ListenUDP {
private List<StandardEvent> mockEvents;
private final List<StandardEvent<DatagramChannel>> mockEvents;
public MockListenUDP(List<StandardEvent> mockEvents) {
public MockListenUDP(List<StandardEvent<DatagramChannel>> mockEvents) {
this.mockEvents = mockEvents;
}
@ -259,7 +257,5 @@ public class TestListenUDP {
protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) {
return Mockito.mock(ChannelDispatcher.class);
}
}
}