NIFI-8304 This closes #4900. Improved Socket test reliability for several Processors

- Refactored TestPutTCP to single class
- Improved TestListenRELP
- Improved TestListenTCP
- Improved TestListenUDP
- Improved TestListenTCPRecord
- Changed OnUnscheduled to OnStopped in AbstractListenEventProcessor

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
exceptionfactory 2021-03-15 14:04:30 -05:00 committed by Joe Witt
parent f00f0ad269
commit 2ad88bbfff
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
9 changed files with 482 additions and 834 deletions

View File

@ -18,7 +18,7 @@ package org.apache.nifi.processor.util.listen;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
@ -220,8 +220,8 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
return events == null ? 0 : events.size(); return events == null ? 0 : events.size();
} }
@OnUnscheduled @OnStopped
public void onUnscheduled() { public void closeDispatcher() {
if (dispatcher != null) { if (dispatcher != null) {
dispatcher.close(); dispatcher.close();
} }

View File

@ -46,7 +46,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
@ -299,8 +299,8 @@ public class ListenTCPRecord extends AbstractProcessor {
readerThread.start(); readerThread.start();
} }
@OnUnscheduled @OnStopped
public void onUnscheduled() { public void onStopped() {
if (dispatcher != null) { if (dispatcher != null) {
dispatcher.close(); dispatcher.close();
dispatcher = null; dispatcher = null;
@ -460,9 +460,4 @@ public class ListenTCPRecord extends AbstractProcessor {
private String getRemoteAddress(final SocketChannelRecordReader socketChannelRecordReader) { private String getRemoteAddress(final SocketChannelRecordReader socketChannelRecordReader) {
return socketChannelRecordReader.getRemoteAddress() == null ? "null" : socketChannelRecordReader.getRemoteAddress().toString(); return socketChannelRecordReader.getRemoteAddress() == null ? "null" : socketChannelRecordReader.getRemoteAddress().toString();
} }
public final int getDispatcherPort() {
return dispatcher == null ? 0 : dispatcher.getPort();
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -24,21 +25,21 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.response.ChannelResponder; import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.standard.relp.event.RELPEvent; import org.apache.nifi.processors.standard.relp.event.RELPEvent;
import org.apache.nifi.processors.standard.relp.frame.RELPEncoder; import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
import org.apache.nifi.processors.standard.relp.frame.RELPFrame; import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
import org.apache.nifi.processors.standard.relp.response.RELPResponse;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.TlsException; import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
@ -47,8 +48,12 @@ import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class TestListenRELP { public class TestListenRELP {
public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog"; public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";
@ -75,82 +80,83 @@ public class TestListenRELP {
.data(new byte[0]) .data(new byte[0])
.build(); .build();
private static final String LOCALHOST = "localhost";
@Mock
private ChannelResponder<SocketChannel> responder;
@Mock
private ChannelDispatcher channelDispatcher;
@Mock
private RestrictedSSLContextService sslContextService;
private RELPEncoder encoder; private RELPEncoder encoder;
private ResponseCapturingListenRELP proc;
private TestRunner runner; private TestRunner runner;
@Before @Before
public void setup() { public void setup() {
encoder = new RELPEncoder(StandardCharsets.UTF_8); encoder = new RELPEncoder(StandardCharsets.UTF_8);
proc = new ResponseCapturingListenRELP(); runner = TestRunners.newTestRunner(ListenRELP.class);
runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenRELP.PORT, "0");
} }
@Test @Test
public void testListenRELP() throws IOException, InterruptedException { public void testRun() throws IOException {
final List<RELPFrame> frames = new ArrayList<>(); final int syslogFrames = 5;
frames.add(OPEN_FRAME); final List<RELPFrame> frames = getFrames(syslogFrames);
frames.add(SYSLOG_FRAME);
frames.add(SYSLOG_FRAME);
frames.add(SYSLOG_FRAME);
frames.add(CLOSE_FRAME);
// three syslog frames should be transferred and three responses should be sent // three syslog frames should be transferred and three responses should be sent
run(frames, 3, 3, null); run(frames, syslogFrames, syslogFrames, null);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events); Assert.assertNotNull(events);
Assert.assertEquals(3, events.size()); Assert.assertEquals(syslogFrames, events.size());
final ProvenanceEventRecord event = events.get(0); final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("relp")); Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("relp"));
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS); final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
Assert.assertEquals(3, mockFlowFiles.size()); Assert.assertEquals(syslogFrames, mockFlowFiles.size());
final MockFlowFile mockFlowFile = mockFlowFiles.get(0); final MockFlowFile mockFlowFile = mockFlowFiles.get(0);
Assert.assertEquals(String.valueOf(SYSLOG_FRAME.getTxnr()), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.TXNR.key())); Assert.assertEquals(String.valueOf(SYSLOG_FRAME.getTxnr()), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.TXNR.key()));
Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key())); Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key()))); Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key()))); Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
} }
@Test @Test
public void testBatching() throws IOException, InterruptedException { public void testRunBatching() throws IOException {
runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "5"); runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "5");
final List<RELPFrame> frames = new ArrayList<>(); final int syslogFrames = 3;
frames.add(OPEN_FRAME); final List<RELPFrame> frames = getFrames(syslogFrames);
frames.add(SYSLOG_FRAME);
frames.add(SYSLOG_FRAME);
frames.add(SYSLOG_FRAME);
frames.add(CLOSE_FRAME);
// one syslog frame should be transferred since we are batching, but three responses should be sent // one syslog frame should be transferred since we are batching, but three responses should be sent
run(frames, 1, 3, null); final int expectedFlowFiles = 1;
run(frames, expectedFlowFiles, syslogFrames, null);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
Assert.assertNotNull(events); Assert.assertNotNull(events);
Assert.assertEquals(1, events.size()); Assert.assertEquals(expectedFlowFiles, events.size());
final ProvenanceEventRecord event = events.get(0); final ProvenanceEventRecord event = events.get(0);
Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("relp")); Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("relp"));
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS); final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
Assert.assertEquals(1, mockFlowFiles.size()); Assert.assertEquals(expectedFlowFiles, mockFlowFiles.size());
final MockFlowFile mockFlowFile = mockFlowFiles.get(0); final MockFlowFile mockFlowFile = mockFlowFiles.get(0);
Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key())); Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key()))); Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key()))); Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
} }
@Test @Test
public void testMutualTls() throws IOException, InterruptedException, TlsException, InitializationException { public void testRunMutualTls() throws IOException, TlsException, InitializationException {
final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
final String serviceIdentifier = SSLContextService.class.getName(); final String serviceIdentifier = SSLContextService.class.getName();
Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier); Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
final SSLContext sslContext = SslContextUtils.createKeyStoreSslContext(); final SSLContext sslContext = SslContextUtils.createKeyStoreSslContext();
@ -160,33 +166,26 @@ public class TestListenRELP {
runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier); runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier);
final List<RELPFrame> frames = new ArrayList<>(); final int syslogFrames = 3;
frames.add(OPEN_FRAME); final List<RELPFrame> frames = getFrames(syslogFrames);
frames.add(SYSLOG_FRAME); run(frames, syslogFrames, syslogFrames, sslContext);
frames.add(SYSLOG_FRAME);
frames.add(SYSLOG_FRAME);
frames.add(SYSLOG_FRAME);
frames.add(SYSLOG_FRAME);
frames.add(CLOSE_FRAME);
run(frames, 5, 5, sslContext);
} }
@Test @Test
public void testNoEventsAvailable() throws IOException, InterruptedException { public void testRunNoEventsAvailable() {
MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<RELPEvent>()); MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>());
runner = TestRunners.newTestRunner(mockListenRELP); runner = TestRunners.newTestRunner(mockListenRELP);
runner.setProperty(ListenRELP.PORT, "1"); runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0); runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
runner.shutdown();
} }
@Test @Test
public void testBatchingWithDifferentSenders() throws IOException, InterruptedException { public void testBatchingWithDifferentSenders() {
final String sender1 = "sender1"; final String sender1 = "sender1";
final String sender2 = "sender2"; final String sender2 = "sender2";
final ChannelResponder<SocketChannel> responder = Mockito.mock(ChannelResponder.class);
final List<RELPEvent> mockEvents = new ArrayList<>(); final List<RELPEvent> mockEvents = new ArrayList<>();
mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand())); mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
@ -196,96 +195,68 @@ public class TestListenRELP {
MockListenRELP mockListenRELP = new MockListenRELP(mockEvents); MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
runner = TestRunners.newTestRunner(mockListenRELP); runner = TestRunners.newTestRunner(mockListenRELP);
runner.setProperty(ListenRELP.PORT, "1"); runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10"); runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2); runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
runner.shutdown();
} }
private void run(final List<RELPFrame> frames, final int flowFiles, final int responses, final SSLContext sslContext)
throws IOException {
protected void run(final List<RELPFrame> frames, final int expectedTransferred, final int expectedResponses, final SSLContext sslContext) final int port = NetworkUtils.availablePort();
throws IOException, InterruptedException { runner.setProperty(ListenRELP.PORT, Integer.toString(port));
Socket socket = null; // Run Processor and start Dispatcher without shutting down
try { runner.run(1, false, true);
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
// create a client connection to the port the dispatcher is listening on try (final Socket socket = getSocket(port, sslContext)) {
final int realPort = proc.getDispatcherPort(); final OutputStream outputStream = socket.getOutputStream();
sendFrames(frames, outputStream);
// create either a regular socket or ssl socket based on context being passed in // Run Processor for number of responses
if (sslContext == null) { runner.run(responses, false, false);
socket = new Socket("localhost", realPort);
} else {
socket = sslContext.getSocketFactory().createSocket("localhost", realPort);
}
Thread.sleep(100);
// send the frames to the port the processors is listening on
sendFrames(frames, socket);
long responseTimeout = 30000;
// this first loop waits until the internal queue of the processor has the expected
// number of messages ready before proceeding, we want to guarantee they are all there
// before onTrigger gets a chance to run
long startTimeQueueSizeCheck = System.currentTimeMillis();
while (proc.getQueueSize() < expectedResponses
&& (System.currentTimeMillis() - startTimeQueueSizeCheck < responseTimeout)) {
Thread.sleep(100);
}
// want to fail here if the queue size isn't what we expect
Assert.assertEquals(expectedResponses, proc.getQueueSize());
// call onTrigger until we got a respond for all the frames, or a certain amount of time passes
long startTimeProcessing = System.currentTimeMillis();
while (proc.responses.size() < expectedResponses
&& (System.currentTimeMillis() - startTimeProcessing < responseTimeout)) {
proc.onTrigger(context, processSessionFactory);
Thread.sleep(100);
}
// should have gotten a response for each frame
Assert.assertEquals(expectedResponses, proc.responses.size());
// should have transferred the expected events
runner.assertTransferCount(ListenRELP.REL_SUCCESS, expectedTransferred);
runner.assertTransferCount(ListenRELP.REL_SUCCESS, flowFiles);
} finally { } finally {
// unschedule to close connections runner.shutdown();
proc.onUnscheduled();
IOUtils.closeQuietly(socket);
} }
} }
private void sendFrames(final List<RELPFrame> frames, final Socket socket) throws IOException, InterruptedException { private void sendFrames(final List<RELPFrame> frames, final OutputStream outputStream) throws IOException {
// send the provided messages
for (final RELPFrame frame : frames) { for (final RELPFrame frame : frames) {
byte[] encodedFrame = encoder.encode(frame); final byte[] encodedFrame = encoder.encode(frame);
socket.getOutputStream().write(encodedFrame); outputStream.write(encodedFrame);
outputStream.flush();
} }
socket.getOutputStream().flush();
} }
// Extend ListenRELP so we can use the CapturingSocketChannelResponseDispatcher private Socket getSocket(final int port, final SSLContext sslContext) throws IOException {
private static class ResponseCapturingListenRELP extends ListenRELP { final Socket socket;
if (sslContext == null) {
private final List<RELPResponse> responses = new ArrayList<>(); socket = new Socket(LOCALHOST, port);
} else {
@Override socket = sslContext.getSocketFactory().createSocket(LOCALHOST, port);
protected void respond(RELPEvent event, RELPResponse relpResponse) {
this.responses.add(relpResponse);
super.respond(event, relpResponse);
} }
return socket;
}
private List<RELPFrame> getFrames(final int syslogFrames) {
final List<RELPFrame> frames = new ArrayList<>();
frames.add(OPEN_FRAME);
for (int i = 0; i < syslogFrames; i++) {
frames.add(SYSLOG_FRAME);
}
frames.add(CLOSE_FRAME);
return frames;
} }
// Extend ListenRELP to mock the ChannelDispatcher and allow us to return staged events // Extend ListenRELP to mock the ChannelDispatcher and allow us to return staged events
private static class MockListenRELP extends ListenRELP { private class MockListenRELP extends ListenRELP {
private final List<RELPEvent> mockEvents; private final List<RELPEvent> mockEvents;
@ -301,8 +272,8 @@ public class TestListenRELP {
} }
@Override @Override
protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<RELPEvent> events) throws IOException { protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<RELPEvent> events) {
return Mockito.mock(ChannelDispatcher.class); return channelDispatcher;
} }
} }

View File

@ -16,9 +16,7 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils; import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.ClientAuth; import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.security.util.TlsException; import org.apache.nifi.security.util.TlsException;
@ -34,24 +32,23 @@ import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
public class TestListenTCP { public class TestListenTCP {
private static final long RESPONSE_TIMEOUT = 10000;
private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName(); private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName();
private static final String LOCALHOST = "localhost";
private static SSLContext keyStoreSslContext; private static SSLContext keyStoreSslContext;
private static SSLContext trustStoreSslContext; private static SSLContext trustStoreSslContext;
private ListenTCP proc;
private TestRunner runner; private TestRunner runner;
@BeforeClass @BeforeClass
@ -62,9 +59,7 @@ public class TestListenTCP {
@Before @Before
public void setup() { public void setup() {
proc = new ListenTCP(); runner = TestRunners.newTestRunner(ListenTCP.class);
runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenTCP.PORT, "0");
} }
@Test @Test
@ -81,7 +76,7 @@ public class TestListenTCP {
} }
@Test @Test
public void testListenTCP() throws IOException, InterruptedException { public void testListenTCP() throws IOException {
final List<String> messages = new ArrayList<>(); final List<String> messages = new ArrayList<>();
messages.add("This is message 1\n"); messages.add("This is message 1\n");
messages.add("This is message 2\n"); messages.add("This is message 2\n");
@ -89,7 +84,7 @@ public class TestListenTCP {
messages.add("This is message 4\n"); messages.add("This is message 4\n");
messages.add("This is message 5\n"); messages.add("This is message 5\n");
runTCP(messages, messages.size(), null); run(messages, messages.size(), null);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS); List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
for (int i = 0; i < mockFlowFiles.size(); i++) { for (int i = 0; i < mockFlowFiles.size(); i++) {
@ -98,7 +93,7 @@ public class TestListenTCP {
} }
@Test @Test
public void testListenTCPBatching() throws IOException, InterruptedException { public void testListenTCPBatching() throws IOException {
runner.setProperty(ListenTCP.MAX_BATCH_SIZE, "3"); runner.setProperty(ListenTCP.MAX_BATCH_SIZE, "3");
final List<String> messages = new ArrayList<>(); final List<String> messages = new ArrayList<>();
@ -108,7 +103,7 @@ public class TestListenTCP {
messages.add("This is message 4\n"); messages.add("This is message 4\n");
messages.add("This is message 5\n"); messages.add("This is message 5\n");
runTCP(messages, 2, null); run(messages, 2, null);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS); List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
@ -120,9 +115,7 @@ public class TestListenTCP {
} }
@Test @Test
public void testTLSClientAuthRequiredAndClientCertProvided() throws IOException, InterruptedException, public void testTLSClientAuthRequiredAndClientCertProvided() throws IOException, InitializationException {
InitializationException {
runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.REQUIRED.name()); runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.REQUIRED.name());
enableSslContextService(keyStoreSslContext); enableSslContextService(keyStoreSslContext);
@ -133,8 +126,7 @@ public class TestListenTCP {
messages.add("This is message 4\n"); messages.add("This is message 4\n");
messages.add("This is message 5\n"); messages.add("This is message 5\n");
// Make an SSLContext with a key and trust store to send the test messages run(messages, messages.size(), keyStoreSslContext);
runTCP(messages, messages.size(), keyStoreSslContext);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS); List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
for (int i = 0; i < mockFlowFiles.size(); i++) { for (int i = 0; i < mockFlowFiles.size(); i++) {
@ -154,14 +146,13 @@ public class TestListenTCP {
messages.add("This is message 4\n"); messages.add("This is message 4\n");
messages.add("This is message 5\n"); messages.add("This is message 5\n");
// Make an SSLContext that only has the trust store, this should not work since the processor has client auth REQUIRED
Assert.assertThrows(IOException.class, () -> Assert.assertThrows(IOException.class, () ->
runTCP(messages, messages.size(), trustStoreSslContext) run(messages, messages.size(), trustStoreSslContext)
); );
} }
@Test @Test
public void testTLSClientAuthNoneAndClientCertNotProvided() throws IOException, InterruptedException, InitializationException { public void testTLSClientAuthNoneAndClientCertNotProvided() throws IOException, InitializationException {
runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.NONE.name()); runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.NONE.name());
enableSslContextService(keyStoreSslContext); enableSslContextService(keyStoreSslContext);
@ -172,8 +163,7 @@ public class TestListenTCP {
messages.add("This is message 4\n"); messages.add("This is message 4\n");
messages.add("This is message 5\n"); messages.add("This is message 5\n");
// Make an SSLContext that only has the trust store, this should not work since the processor has client auth REQUIRED run(messages, messages.size(), trustStoreSslContext);
runTCP(messages, messages.size(), trustStoreSslContext);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS); List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS);
for (int i = 0; i < mockFlowFiles.size(); i++) { for (int i = 0; i < mockFlowFiles.size(); i++) {
@ -181,65 +171,41 @@ public class TestListenTCP {
} }
} }
protected void runTCP(final List<String> messages, final int expectedTransferred, final SSLContext sslContext) protected void run(final List<String> messages, final int flowFiles, final SSLContext sslContext)
throws IOException, InterruptedException { throws IOException {
Socket socket = null; final int port = NetworkUtils.availablePort();
try { runner.setProperty(ListenTCP.PORT, Integer.toString(port));
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
// create a client connection to the port the dispatcher is listening on // Run Processor and start Dispatcher without shutting down
final int realPort = proc.getDispatcherPort(); runner.run(1, false, true);
// create either a regular socket or ssl socket based on context being passed in try (final Socket socket = getSocket(port, sslContext)) {
if (sslContext == null) { final OutputStream outputStream = socket.getOutputStream();
socket = new Socket("localhost", realPort);
} else {
final SocketFactory socketFactory = sslContext.getSocketFactory();
socket = socketFactory.createSocket("localhost", realPort);
}
Thread.sleep(100);
// send the frames to the port the processors is listening on
for (final String message : messages) { for (final String message : messages) {
socket.getOutputStream().write(message.getBytes(StandardCharsets.UTF_8)); outputStream.write(message.getBytes(StandardCharsets.UTF_8));
Thread.sleep(1);
} }
socket.getOutputStream().flush(); outputStream.flush();
// this first loop waits until the internal queue of the processor has the expected // Run Processor for number of responses
// number of messages ready before proceeding, we want to guarantee they are all there runner.run(flowFiles, false, false);
// before onTrigger gets a chance to run
long startTimeQueueSizeCheck = System.currentTimeMillis();
while (proc.getQueueSize() < messages.size()
&& (System.currentTimeMillis() - startTimeQueueSizeCheck < RESPONSE_TIMEOUT)) {
Thread.sleep(100);
}
// want to fail here if the queue size isn't what we expect runner.assertTransferCount(ListenTCP.REL_SUCCESS, flowFiles);
Assert.assertEquals(messages.size(), proc.getQueueSize());
// call onTrigger until we processed all the frames, or a certain amount of time passes
int numTransferred = 0;
long startTime = System.currentTimeMillis();
while (numTransferred < expectedTransferred && (System.currentTimeMillis() - startTime < RESPONSE_TIMEOUT)) {
proc.onTrigger(context, processSessionFactory);
numTransferred = runner.getFlowFilesForRelationship(ListenTCP.REL_SUCCESS).size();
Thread.sleep(100);
}
// should have transferred the expected events
runner.assertTransferCount(ListenTCP.REL_SUCCESS, expectedTransferred);
} finally { } finally {
// unschedule to close connections runner.shutdown();
proc.onUnscheduled();
IOUtils.closeQuietly(socket);
} }
} }
private Socket getSocket(final int port, final SSLContext sslContext) throws IOException {
final Socket socket;
if (sslContext == null) {
socket = new Socket(LOCALHOST, port);
} else {
socket = sslContext.getSocketFactory().createSocket(LOCALHOST, port);
}
return socket;
}
private void enableSslContextService(final SSLContext sslContext) throws InitializationException { private void enableSslContextService(final SSLContext sslContext) throws InitializationException {
final RestrictedSSLContextService sslContextService = Mockito.mock(RestrictedSSLContextService.class); final RestrictedSSLContextService sslContextService = Mockito.mock(RestrictedSSLContextService.class);
Mockito.when(sslContextService.getIdentifier()).thenReturn(SSL_CONTEXT_IDENTIFIER); Mockito.when(sslContextService.getIdentifier()).thenReturn(SSL_CONTEXT_IDENTIFIER);

View File

@ -16,18 +16,17 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.Socket; import java.net.Socket;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.security.util.ClientAuth; import org.apache.nifi.security.util.ClientAuth;
@ -46,12 +45,8 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestListenTCPRecord { public class TestListenTCPRecord {
static final Logger LOGGER = LoggerFactory.getLogger(TestListenTCPRecord.class);
static final String SCHEMA_TEXT = "{\n" + static final String SCHEMA_TEXT = "{\n" +
" \"name\": \"syslogRecord\",\n" + " \"name\": \"syslogRecord\",\n" +
" \"namespace\": \"nifi\",\n" + " \"namespace\": \"nifi\",\n" +
@ -63,17 +58,13 @@ public class TestListenTCPRecord {
" ]\n" + " ]\n" +
"}"; "}";
static final List<String> DATA; static final String DATA = "[" +
"{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"}," +
"{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"}," +
"{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"}" +
"]";
static { private static final String LOCALHOST = "localhost";
final List<String> data = new ArrayList<>();
data.add("[");
data.add("{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"},");
data.add("{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"},");
data.add("{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"}");
data.add("]");
DATA = Collections.unmodifiableList(data);
}
private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName(); private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName();
@ -81,7 +72,6 @@ public class TestListenTCPRecord {
private static SSLContext trustStoreSslContext; private static SSLContext trustStoreSslContext;
private ListenTCPRecord proc;
private TestRunner runner; private TestRunner runner;
@BeforeClass @BeforeClass
@ -92,9 +82,7 @@ public class TestListenTCPRecord {
@Before @Before
public void setup() throws InitializationException { public void setup() throws InitializationException {
proc = new ListenTCPRecord(); runner = TestRunners.newTestRunner(ListenTCPRecord.class);
runner = TestRunners.newTestRunner(proc);
runner.setProperty(ListenTCPRecord.PORT, "0");
final String readerId = "record-reader"; final String readerId = "record-reader";
final RecordReaderFactory readerFactory = new JsonTreeReader(); final RecordReaderFactory readerFactory = new JsonTreeReader();
@ -110,7 +98,6 @@ public class TestListenTCPRecord {
runner.setProperty(ListenTCPRecord.RECORD_READER, readerId); runner.setProperty(ListenTCPRecord.RECORD_READER, readerId);
runner.setProperty(ListenTCPRecord.RECORD_WRITER, writerId); runner.setProperty(ListenTCPRecord.RECORD_WRITER, writerId);
} }
@Test @Test
@ -130,7 +117,7 @@ public class TestListenTCPRecord {
public void testOneRecordPerFlowFile() throws IOException, InterruptedException { public void testOneRecordPerFlowFile() throws IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "1"); runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "1");
runTCP(DATA, 3, null); run(3, null);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS); List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
for (int i = 0; i < mockFlowFiles.size(); i++) { for (int i = 0; i < mockFlowFiles.size(); i++) {
@ -147,7 +134,7 @@ public class TestListenTCPRecord {
public void testMultipleRecordsPerFlowFileLessThanBatchSize() throws IOException, InterruptedException { public void testMultipleRecordsPerFlowFileLessThanBatchSize() throws IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "5"); runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "5");
runTCP(DATA, 1, null); run(1, null);
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS); final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
Assert.assertEquals(1, mockFlowFiles.size()); Assert.assertEquals(1, mockFlowFiles.size());
@ -167,7 +154,7 @@ public class TestListenTCPRecord {
runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name()); runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
enableSslContextService(keyStoreSslContext); enableSslContextService(keyStoreSslContext);
runTCP(DATA, 1, keyStoreSslContext); run(1, keyStoreSslContext);
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS); final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
Assert.assertEquals(1, mockFlowFiles.size()); Assert.assertEquals(1, mockFlowFiles.size());
@ -179,22 +166,12 @@ public class TestListenTCPRecord {
Assert.assertTrue(content.contains("This is a test " + 3)); Assert.assertTrue(content.contains("This is a test " + 3));
} }
@Test
public void testTLSClientAuthRequiredAndClientCertNotProvided() throws InitializationException, IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
runner.setProperty(ListenTCPRecord.READ_TIMEOUT, "5 seconds");
enableSslContextService(keyStoreSslContext);
runTCP(DATA, 0, trustStoreSslContext);
}
@Test @Test
public void testTLSClientAuthNoneAndClientCertNotProvided() throws InitializationException, IOException, InterruptedException { public void testTLSClientAuthNoneAndClientCertNotProvided() throws InitializationException, IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.NONE.name()); runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.NONE.name());
enableSslContextService(keyStoreSslContext); enableSslContextService(keyStoreSslContext);
runTCP(DATA, 1, trustStoreSslContext); run(1, trustStoreSslContext);
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS); final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
Assert.assertEquals(1, mockFlowFiles.size()); Assert.assertEquals(1, mockFlowFiles.size());
@ -206,88 +183,43 @@ public class TestListenTCPRecord {
Assert.assertTrue(content.contains("This is a test " + 3)); Assert.assertTrue(content.contains("This is a test " + 3));
} }
protected void runTCP(final List<String> messages, final int expectedTransferred, final SSLContext sslContext) protected void run(final int expectedTransferred, final SSLContext sslContext) throws IOException, InterruptedException {
throws IOException, InterruptedException { final int port = NetworkUtils.availablePort();
runner.setProperty(ListenTCPRecord.PORT, Integer.toString(port));
SocketSender sender = null; // Run Processor and start listener without shutting down
try { runner.run(1, false, true);
// schedule to start listening on a random port
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
Thread.sleep(100);
sender = new SocketSender(proc.getDispatcherPort(), "localhost", sslContext, messages, 0); final AtomicBoolean completed = new AtomicBoolean(false);
final Thread thread = new Thread(() -> {
final Thread senderThread = new Thread(sender); try (final Socket socket = getSocket(port, sslContext)) {
senderThread.setDaemon(true); final OutputStream outputStream = socket.getOutputStream();
senderThread.start(); outputStream.write(DATA.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
long timeout = 10000; completed.set(true);
} catch (final IOException e) {
// call onTrigger until we processed all the records, or a certain amount of time passes throw new UncheckedIOException(e);
int numTransferred = 0;
long startTime = System.currentTimeMillis();
while (numTransferred < expectedTransferred && (System.currentTimeMillis() - startTime < timeout)) {
proc.onTrigger(context, processSessionFactory);
numTransferred = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS).size();
Thread.sleep(100);
} }
});
thread.start();
// should have transferred the expected events // Wait for Send Completion
runner.assertTransferCount(ListenTCPRecord.REL_SUCCESS, expectedTransferred); completed.compareAndSet(true, false);
} finally {
// unschedule to close connections // Run Processor for expected FlowFiles with an additional run to ensure completion
proc.onUnscheduled(); final int iterations = expectedTransferred + 1;
IOUtils.closeQuietly(sender); runner.run(iterations, true, false);
} runner.assertTransferCount(ListenTCPRecord.REL_SUCCESS, expectedTransferred);
} }
private static class SocketSender implements Runnable, Closeable { private Socket getSocket(final int port, final SSLContext sslContext) throws IOException {
final Socket socket;
private final int port; if (sslContext == null) {
private final String host; socket = new Socket(LOCALHOST, port);
private final SSLContext sslContext; } else {
private final List<String> data; socket = sslContext.getSocketFactory().createSocket(LOCALHOST, port);
private final long delay;
private Socket socket;
public SocketSender(final int port, final String host, final SSLContext sslContext, final List<String> data, final long delay) {
this.port = port;
this.host = host;
this.sslContext = sslContext;
this.data = data;
this.delay = delay;
}
@Override
public void run() {
try {
if (sslContext != null) {
socket = sslContext.getSocketFactory().createSocket(host, port);
} else {
socket = new Socket(host, port);
}
for (final String message : data) {
socket.getOutputStream().write(message.getBytes(StandardCharsets.UTF_8));
if (delay > 0) {
Thread.sleep(delay);
}
}
socket.getOutputStream().flush();
} catch (final Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
IOUtils.closeQuietly(socket);
}
}
public void close() {
IOUtils.closeQuietly(socket);
} }
return socket;
} }
private void enableSslContextService(final SSLContext sslContext) throws InitializationException { private void enableSslContextService(final SSLContext sslContext) throws InitializationException {

View File

@ -16,23 +16,20 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.StandardEvent; import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.response.ChannelResponder; import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -47,34 +44,21 @@ import java.util.concurrent.BlockingQueue;
public class TestListenUDP { public class TestListenUDP {
private static final String LOCALHOST = "localhost";
private int port = 0; private int port = 0;
private ListenUDP proc;
private TestRunner runner; private TestRunner runner;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.ListenUDP", "debug");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestListenUDP", "debug");
}
@AfterClass
public static void tearDownAfterClass() {
System.setProperty("org.slf4j.simpleLogger.showDateTime", "false");
}
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
proc = new ListenUDP(); runner = TestRunners.newTestRunner(ListenUDP.class);
runner = TestRunners.newTestRunner(proc); port = NetworkUtils.availablePort();
runner.setProperty(ListenUDP.PORT, String.valueOf(port)); runner.setProperty(ListenUDP.PORT, Integer.toString(port));
} }
@Test @Test
public void testCustomValidation() { public void testCustomValidation() {
runner.assertNotValid();
runner.setProperty(ListenUDP.PORT, "1"); runner.setProperty(ListenUDP.PORT, "1");
runner.assertValid(); runner.assertValid();
@ -110,15 +94,13 @@ public class TestListenUDP {
runner.setProperty(ListenUDP.MAX_MESSAGE_QUEUE_SIZE, String.valueOf(maxQueueSize)); runner.setProperty(ListenUDP.MAX_MESSAGE_QUEUE_SIZE, String.valueOf(maxQueueSize));
final List<String> messages = getMessages(20); final List<String> messages = getMessages(20);
final int expectedQueued = maxQueueSize;
final int expectedTransferred = maxQueueSize;
run(new DatagramSocket(), messages, expectedQueued, expectedTransferred); run(new DatagramSocket(), messages, maxQueueSize, maxQueueSize);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, maxQueueSize); runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, maxQueueSize);
List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS); List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
verifyFlowFiles(mockFlowFiles); verifyFlowFiles(mockFlowFiles);
verifyProvenance(expectedTransferred); verifyProvenance(maxQueueSize);
} }
@Test @Test
@ -146,7 +128,7 @@ public class TestListenUDP {
} }
@Test @Test
public void testBatchingWithDifferentSenders() throws IOException, InterruptedException { public void testBatchingWithDifferentSenders() {
final String sender1 = "sender1"; final String sender1 = "sender1";
final String sender2 = "sender2"; final String sender2 = "sender2";
final ChannelResponder responder = Mockito.mock(ChannelResponder.class); final ChannelResponder responder = Mockito.mock(ChannelResponder.class);
@ -164,7 +146,6 @@ public class TestListenUDP {
runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10"); runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
// sending 4 messages with a batch size of 10, but should get 2 FlowFiles because of different senders // sending 4 messages with a batch size of 10, but should get 2 FlowFiles because of different senders
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2); runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
@ -172,7 +153,7 @@ public class TestListenUDP {
} }
@Test @Test
public void testRunWhenNoEventsAvailable() throws IOException, InterruptedException { public void testRunWhenNoEventsAvailable() {
final List<StandardEvent> mockEvents = new ArrayList<>(); final List<StandardEvent> mockEvents = new ArrayList<>();
MockListenUDP mockListenUDP = new MockListenUDP(mockEvents); MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
@ -206,26 +187,6 @@ public class TestListenUDP {
verifyProvenance(expectedTransferred); verifyProvenance(expectedTransferred);
} }
@Test
public void testWithSendingHostAndPortDifferentThanSender() throws IOException, InterruptedException {
final String sendingHost = "localhost";
final Integer sendingPort = 21001;
runner.setProperty(ListenUDP.SENDING_HOST, sendingHost);
runner.setProperty(ListenUDP.SENDING_HOST_PORT, String.valueOf(sendingPort));
// bind to a different sending port than the processor has for Sending Host Port
final DatagramSocket socket = new DatagramSocket(21002);
// no messages should come through since we are listening for 21001 and sending from 21002
final List<String> messages = getMessages(6);
final int expectedQueued = 0;
final int expectedTransferred = 0;
run(socket, messages, expectedQueued, expectedTransferred);
runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 0);
}
private List<String> getMessages(int numMessages) { private List<String> getMessages(int numMessages) {
final List<String> messages = new ArrayList<>(); final List<String> messages = new ArrayList<>();
for (int i=0; i < numMessages; i++) { for (int i=0; i < numMessages; i++) {
@ -256,54 +217,25 @@ public class TestListenUDP {
protected void run(final DatagramSocket socket, final List<String> messages, final int expectedQueueSize, final int expectedTransferred) protected void run(final DatagramSocket socket, final List<String> messages, final int expectedQueueSize, final int expectedTransferred)
throws IOException, InterruptedException { throws IOException, InterruptedException {
// Run Processor and start Dispatcher without shutting down
runner.run(1, false, true);
try { try {
// schedule to start listening on a random port final InetSocketAddress destination = new InetSocketAddress(LOCALHOST, port);
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
final ProcessContext context = runner.getProcessContext();
proc.onScheduled(context);
Thread.sleep(100);
// get the real port the dispatcher is listening on
final int destPort = proc.getDispatcherPort();
final InetSocketAddress destination = new InetSocketAddress("localhost", destPort);
// send the messages to the port the processors is listening on
for (final String message : messages) { for (final String message : messages) {
final byte[] buffer = message.getBytes(StandardCharsets.UTF_8); final byte[] buffer = message.getBytes(StandardCharsets.UTF_8);
final DatagramPacket packet = new DatagramPacket(buffer, buffer.length, destination); final DatagramPacket packet = new DatagramPacket(buffer, buffer.length, destination);
socket.send(packet); socket.send(packet);
Thread.sleep(10);
} }
long responseTimeout = 10000; // Run Processor for number of responses
runner.run(expectedTransferred, false, false);
// this first loop waits until the internal queue of the processor has the expected
// number of messages ready before proceeding, we want to guarantee they are all there
// before onTrigger gets a chance to run
long startTimeQueueSizeCheck = System.currentTimeMillis();
while (proc.getQueueSize() < expectedQueueSize
&& (System.currentTimeMillis() - startTimeQueueSizeCheck < responseTimeout)) {
Thread.sleep(100);
}
// want to fail here if the queue size isn't what we expect
Assert.assertEquals(expectedQueueSize, proc.getQueueSize());
// call onTrigger until we processed all the messages, or a certain amount of time passes
int numTransferred = 0;
long startTime = System.currentTimeMillis();
while (numTransferred < expectedTransferred && (System.currentTimeMillis() - startTime < responseTimeout)) {
proc.onTrigger(context, processSessionFactory);
numTransferred = runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS).size();
Thread.sleep(100);
}
// should have transferred the expected events
runner.assertTransferCount(ListenUDP.REL_SUCCESS, expectedTransferred); runner.assertTransferCount(ListenUDP.REL_SUCCESS, expectedTransferred);
} finally { } finally {
// unschedule to close connections runner.shutdown();
proc.onUnscheduled();
IOUtils.closeQuietly(socket);
} }
} }
@ -324,7 +256,7 @@ public class TestListenUDP {
} }
@Override @Override
protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) throws IOException { protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) {
return Mockito.mock(ChannelDispatcher.class); return Mockito.mock(ChannelDispatcher.class);
} }

View File

@ -14,18 +14,246 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import org.apache.nifi.processors.standard.util.TestPutTCPCommon; import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.processors.standard.util.TCPTestServer;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestPutTCP extends TestPutTCPCommon { import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocketFactory;
import java.net.InetAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
public TestPutTCP() { import static org.junit.Assert.assertArrayEquals;
super(); import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class TestPutTCP {
private final static String TCP_SERVER_ADDRESS = "127.0.0.1";
private final static String SERVER_VARIABLE = "server.address";
private final static String TCP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}";
private final static int MIN_INVALID_PORT = 0;
private final static int MIN_VALID_PORT = 1;
private final static int MAX_VALID_PORT = 65535;
private final static int MAX_INVALID_PORT = 65536;
private final static int BUFFER_SIZE = 1024;
private final static int VALID_LARGE_FILE_SIZE = 32768;
private final static int VALID_SMALL_FILE_SIZE = 64;
private final static int LOAD_TEST_ITERATIONS = 500;
private final static int LOAD_TEST_THREAD_COUNT = 1;
private final static int DEFAULT_ITERATIONS = 1;
private final static int DEFAULT_THREAD_COUNT = 1;
private final static char CONTENT_CHAR = 'x';
private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
private final static int LONG_TEST_TIMEOUT_PERIOD = 300000;
private final static String OUTGOING_MESSAGE_DELIMITER = "\n";
private final static String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
private final static String[] EMPTY_FILE = { "" };
private final static String[] VALID_FILES = { "abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" };
private TCPTestServer server;
private int port;
private ArrayBlockingQueue<List<Byte>> received;
private TestRunner runner;
@Before
public void setup() throws Exception {
received = new ArrayBlockingQueue<>(BUFFER_SIZE);
runner = TestRunners.newTestRunner(PutTCP.class);
runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
} }
@Override @After
public void configureProperties(String host, int port, String outgoingMessageDelimiter, boolean connectionPerFlowFile, boolean expectValid) { public void cleanup() {
runner.shutdown();
removeTestServer(server);
}
@Test
public void testPortProperty() {
runner.setProperty(PutTCP.PORT, Integer.toString(MIN_INVALID_PORT));
runner.assertNotValid();
runner.setProperty(PutTCP.PORT, Integer.toString(MIN_VALID_PORT));
runner.assertValid();
runner.setProperty(PutTCP.PORT, Integer.toString(MAX_VALID_PORT));
runner.assertValid();
runner.setProperty(PutTCP.PORT, Integer.toString(MAX_INVALID_PORT));
runner.assertNotValid();
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testRunSuccess() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
}
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testRunSuccessSslContextService() throws Exception {
final TlsConfiguration tlsConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
try {
final SSLContext sslContext = SslContextFactory.createSslContext(tlsConfiguration);
assertNotNull("SSLContext not found", sslContext);
final String identifier = SSLContextService.class.getName();
final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
Mockito.when(sslContextService.getIdentifier()).thenReturn(identifier);
Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
runner.addControllerService(identifier, sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(PutTCP.SSL_CONTEXT_SERVICE, identifier);
final SSLServerSocketFactory serverSocketFactory = sslContext.getServerSocketFactory();
createTestServer(OUTGOING_MESSAGE_DELIMITER, false, serverSocketFactory);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
} finally {
Files.deleteIfExists(Paths.get(tlsConfiguration.getKeystorePath()));
Files.deleteIfExists(Paths.get(tlsConfiguration.getTruststorePath()));
}
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testRunSuccessServerVariableExpression() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS_EL, OUTGOING_MESSAGE_DELIMITER, false);
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testRunSuccessPruneSenders() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
sendTestData(VALID_FILES);
assertTransfers(VALID_FILES.length);
assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
runner.setProperty(PutTCP.IDLE_EXPIRATION, "500 ms");
Thread.sleep(1000);
runner.run(1, false, false);
runner.clearTransferState();
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections after prune senders not matched", server.getTotalNumConnections(), 2);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testRunSuccessMultiCharDelimiter() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false);
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
}
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testRunSuccessConnectionPerFlowFile() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER, true);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), VALID_FILES.length);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testRunSuccessConnectionFailure() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
removeTestServer(server);
runner.clearTransferState();
sendTestData(VALID_FILES);
Thread.sleep(500);
assertNull("Unexpected Data Received", received.poll());
runner.assertQueueEmpty();
assertEquals("Server Connections after restart not matched", server.getTotalNumConnections(), 1);
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testRunSuccessEmptyFile() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
sendTestData(EMPTY_FILE);
assertTransfers(EMPTY_FILE.length);
runner.assertQueueEmpty();
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testRunSuccessLargeValidFile() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
sendTestData(testData);
assertMessagesReceived(testData);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), testData.length);
}
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testRunSuccessFiveHundredMessages() throws Exception {
createTestServer(OUTGOING_MESSAGE_DELIMITER);
Thread.sleep(1000);
final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
assertMessagesReceived(testData, LOAD_TEST_ITERATIONS);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
}
private void createTestServer(final String delimiter) throws Exception {
createTestServer(delimiter, false);
}
private void createTestServer(final String delimiter, final boolean closeOnMessageReceived) throws Exception {
createTestServer(delimiter, closeOnMessageReceived, ServerSocketFactory.getDefault());
}
private void createTestServer(final String delimiter, final boolean closeOnMessageReceived, final ServerSocketFactory serverSocketFactory) throws Exception {
server = new TCPTestServer(InetAddress.getByName(TCP_SERVER_ADDRESS), received, delimiter, closeOnMessageReceived);
server.startServer(serverSocketFactory);
port = server.getPort();
}
private void removeTestServer(final TCPTestServer server) {
if (server != null) {
server.shutdown();
}
}
private void configureProperties(String host, String outgoingMessageDelimiter, boolean connectionPerFlowFile) {
runner.setProperty(PutTCP.HOSTNAME, host); runner.setProperty(PutTCP.HOSTNAME, host);
runner.setProperty(PutTCP.PORT, Integer.toString(port)); runner.setProperty(PutTCP.PORT, Integer.toString(port));
if (outgoingMessageDelimiter != null) { if (outgoingMessageDelimiter != null) {
@ -33,11 +261,56 @@ public class TestPutTCP extends TestPutTCPCommon {
} }
runner.setProperty(PutTCP.CONNECTION_PER_FLOWFILE, String.valueOf(connectionPerFlowFile)); runner.setProperty(PutTCP.CONNECTION_PER_FLOWFILE, String.valueOf(connectionPerFlowFile));
runner.assertValid();
}
if (expectValid) { private void sendTestData(final String[] testData) {
runner.assertValid(); sendTestData(testData, DEFAULT_ITERATIONS, DEFAULT_THREAD_COUNT);
} else { }
runner.assertNotValid();
private void sendTestData(final String[] testData, final int iterations, final int threadCount) {
runner.setThreadCount(threadCount);
for (int i = 0; i < iterations; i++) {
for (String item : testData) {
runner.enqueue(item.getBytes());
}
runner.run(testData.length, false, i == 0);
} }
} }
private void assertTransfers(final int successCount) {
runner.assertTransferCount(PutTCP.REL_SUCCESS, successCount);
runner.assertTransferCount(PutTCP.REL_FAILURE, 0);
}
private void assertMessagesReceived(final String[] sentData) throws Exception {
assertMessagesReceived(sentData, DEFAULT_ITERATIONS);
runner.assertQueueEmpty();
}
private void assertMessagesReceived(final String[] sentData, final int iterations) throws Exception {
for (int i = 0; i < iterations; i++) {
for (String item : sentData) {
List<Byte> message = received.take();
assertNotNull(String.format("Message [%d] not found", i), message);
Byte[] messageBytes = new Byte[message.size()];
assertArrayEquals(item.getBytes(), ArrayUtils.toPrimitive(message.toArray(messageBytes)));
}
}
runner.assertTransferCount(PutTCP.REL_SUCCESS, sentData.length * iterations);
runner.clearTransferState();
assertNull("Unexpected Message Found", received.poll());
}
private String[] createContent(final int size) {
final char[] content = new char[size];
for (int i = 0; i < size; i++) {
content[i] = CONTENT_CHAR;
}
return new String[] { new String(content) };
}
} }

View File

@ -1,82 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.processors.standard.util.TestPutTCPCommon;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.KeystoreType;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.StandardTlsConfiguration;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService;
import org.junit.BeforeClass;
import org.mockito.Mockito;
import javax.net.ssl.SSLContext;
public class TestPutTcpSSL extends TestPutTCPCommon {
private static final String TLS_PROTOCOL = "TLSv1.2";
private static SSLContext sslContext;
@BeforeClass
public static void configureServices() throws TlsException {
final TlsConfiguration configuration = new StandardTlsConfiguration(
"src/test/resources/keystore.jks",
"passwordpassword",
"passwordpassword",
KeystoreType.JKS,
"src/test/resources/truststore.jks",
"passwordpassword",
KeystoreType.JKS,
TLS_PROTOCOL
);
sslContext = SslContextFactory.createSslContext(configuration);
}
public TestPutTcpSSL() {
super();
serverSocketFactory = sslContext.getServerSocketFactory();
}
@Override
public void configureProperties(String host, int port, String outgoingMessageDelimiter, boolean connectionPerFlowFile, boolean expectValid) throws InitializationException {
runner.setProperty(PutTCP.HOSTNAME, host);
runner.setProperty(PutTCP.PORT, Integer.toString(port));
final SSLContextService sslContextService = Mockito.mock(SSLContextService.class);
final String serviceIdentifier = SSLContextService.class.getName();
Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
runner.addControllerService(serviceIdentifier, sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(PutTCP.SSL_CONTEXT_SERVICE, serviceIdentifier);
if (outgoingMessageDelimiter != null) {
runner.setProperty(PutTCP.OUTGOING_MESSAGE_DELIMITER, outgoingMessageDelimiter);
}
runner.setProperty(PutTCP.CONNECTION_PER_FLOWFILE, String.valueOf(connectionPerFlowFile));
if (expectValid) {
runner.assertValid();
} else {
runner.assertNotValid();
}
}
}

View File

@ -1,339 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.processors.standard.PutTCP;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import javax.net.ServerSocketFactory;
import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public abstract class TestPutTCPCommon {
private final static String TCP_SERVER_ADDRESS = "127.0.0.1";
private final static String SERVER_VARIABLE = "ALKJAFLKJDFLSKJSDFLKJSDF";
private final static String TCP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE + "}";
private final static String UNKNOWN_HOST = "fgdsfgsdffd";
private final static String INVALID_IP_ADDRESS = "300.300.300.300";
private final static int MIN_INVALID_PORT = 0;
private final static int MIN_VALID_PORT = 1;
private final static int MAX_VALID_PORT = 65535;
private final static int MAX_INVALID_PORT = 65536;
private final static int BUFFER_SIZE = 1024;
private final static int VALID_LARGE_FILE_SIZE = 32768;
private final static int VALID_SMALL_FILE_SIZE = 64;
private final static int LOAD_TEST_ITERATIONS = 500;
private final static int LOAD_TEST_THREAD_COUNT = 1;
private final static int DEFAULT_ITERATIONS = 1;
private final static int DEFAULT_THREAD_COUNT = 1;
private final static char CONTENT_CHAR = 'x';
private final static int DATA_WAIT_PERIOD = 1000;
private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
private final static int LONG_TEST_TIMEOUT_PERIOD = 180000;
private final static String OUTGOING_MESSAGE_DELIMITER = "\n";
private final static String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
private TCPTestServer server;
private int port;
private ArrayBlockingQueue<List<Byte>> recvQueue;
public ServerSocketFactory serverSocketFactory;
public TestRunner runner;
// Test Data
private final static String[] EMPTY_FILE = { "" };
private final static String[] VALID_FILES = { "abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@£$%^&*()_+:|{}[];\\" };
@BeforeClass
public static void setUpSuite() {
Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
}
@Before
public void setup() throws Exception {
recvQueue = new ArrayBlockingQueue<>(BUFFER_SIZE);
runner = TestRunners.newTestRunner(PutTCP.class);
runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
}
private TCPTestServer createTestServer(final ArrayBlockingQueue<List<Byte>> queue, final String delimiter, final boolean closeOnMessageReceived) throws Exception {
TCPTestServer server = new TCPTestServer(InetAddress.getByName(TCP_SERVER_ADDRESS), queue, delimiter, closeOnMessageReceived);
server.startServer(serverSocketFactory);
port = server.getPort();
return server;
}
private TCPTestServer createTestServer(final ArrayBlockingQueue<List<Byte>> queue, final String delimiter) throws Exception {
return createTestServer(queue, delimiter, false);
}
@After
public void cleanup() {
runner.shutdown();
removeTestServer(server);
}
private void removeTestServer(TCPTestServer server) {
if (server != null) {
server.shutdown();
}
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testValidFiles() throws Exception {
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testValidFilesEL() throws Exception {
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS_EL, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testPruneSenders() throws Exception {
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
Thread.sleep(10);
checkRelationships(VALID_FILES.length, 0);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
runner.setProperty(PutTCP.IDLE_EXPIRATION, "500 ms");
Thread.sleep(1000);
runner.run(1, false, false);
runner.clearTransferState();
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 2);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testMultiCharDelimiter() throws Exception {
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
}
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testConnectionPerFlowFile() throws Exception {
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER, true);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, true, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, VALID_FILES.length);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testConnectionFailure() throws Exception {
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
removeTestServer(server);
runner.clearTransferState();
sendTestData(VALID_FILES);
Thread.sleep(10);
checkNoDataReceived(recvQueue);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
checkReceivedAllData(recvQueue, VALID_FILES);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testEmptyFile() throws Exception {
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(EMPTY_FILE);
Thread.sleep(10);
checkRelationships(EMPTY_FILE.length, 0);
checkEmptyMessageReceived(recvQueue);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testLargeValidFile() throws Exception {
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, true, true);
final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
sendTestData(testData);
checkReceivedAllData(recvQueue, testData);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, testData.length);
}
@Ignore("This test is failing intermittently as documented in NIFI-4288")
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testInvalidIPAddress() throws Exception {
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(INVALID_IP_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
Thread.sleep(10);
checkRelationships(0, VALID_FILES.length);
checkNoDataReceived(recvQueue);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 0);
}
@Ignore("This test is failing intermittently as documented in NIFI-4288")
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testUnknownHostname() throws Exception {
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
configureProperties(UNKNOWN_HOST, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(VALID_FILES);
Thread.sleep(10);
checkRelationships(0, VALID_FILES.length);
checkNoDataReceived(recvQueue);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 0);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testInvalidPort() throws Exception {
configureProperties(UNKNOWN_HOST, MIN_INVALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, false);
configureProperties(UNKNOWN_HOST, MIN_VALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, true);
configureProperties(UNKNOWN_HOST, MAX_VALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, true);
configureProperties(UNKNOWN_HOST, MAX_INVALID_PORT, OUTGOING_MESSAGE_DELIMITER, false, false);
}
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testLoadTest() throws Exception {
server = createTestServer(recvQueue, OUTGOING_MESSAGE_DELIMITER);
Thread.sleep(1000);
final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
configureProperties(TCP_SERVER_ADDRESS, port, OUTGOING_MESSAGE_DELIMITER, false, true);
sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
checkReceivedAllData(recvQueue, testData, LOAD_TEST_ITERATIONS);
checkInputQueueIsEmpty();
checkTotalNumConnections(server, 1);
}
private void checkTotalNumConnections(final TCPTestServer server, final int expectedTotalNumConnections) {
assertEquals(expectedTotalNumConnections, server.getTotalNumConnections());
}
public abstract void configureProperties(final String host, final int port, final String outgoingMessageDelimiter, final boolean connectionPerFlowFile,
final boolean expectValid) throws InitializationException;
private void sendTestData(final String[] testData) {
sendTestData(testData, DEFAULT_ITERATIONS, DEFAULT_THREAD_COUNT);
}
private void sendTestData(final String[] testData, final int iterations, final int threadCount) {
runner.setThreadCount(threadCount);
for (int i = 0; i < iterations; i++) {
for (String item : testData) {
runner.enqueue(item.getBytes());
}
runner.run(testData.length, false, i == 0);
}
}
private void checkRelationships(final int successCount, final int failedCount) {
runner.assertTransferCount(PutTCP.REL_SUCCESS, successCount);
runner.assertTransferCount(PutTCP.REL_FAILURE, failedCount);
}
private void checkNoDataReceived(final ArrayBlockingQueue<List<Byte>> recvQueue) throws Exception {
Thread.sleep(DATA_WAIT_PERIOD);
assertNull(recvQueue.poll());
}
private void checkEmptyMessageReceived(final ArrayBlockingQueue<List<Byte>> recvQueue) throws Exception {
Thread.sleep(DATA_WAIT_PERIOD);
final List<Byte> message = recvQueue.poll();
assertNotNull(message);
assertEquals(0, message.size());
}
private void checkInputQueueIsEmpty() {
runner.assertQueueEmpty();
}
private void checkReceivedAllData(final ArrayBlockingQueue<List<Byte>> recvQueue, final String[] sentData) throws Exception {
checkReceivedAllData(recvQueue, sentData, DEFAULT_ITERATIONS);
}
private void checkReceivedAllData(final ArrayBlockingQueue<List<Byte>> recvQueue, final String[] sentData, final int iterations) throws Exception {
// check each sent FlowFile was successfully sent and received.
for (int i = 0; i < iterations; i++) {
for (String item : sentData) {
List<Byte> message = recvQueue.take();
assertNotNull(message);
Byte[] messageBytes = new Byte[message.size()];
assertArrayEquals(item.getBytes(), ArrayUtils.toPrimitive(message.toArray(messageBytes)));
}
}
runner.assertTransferCount(PutTCP.REL_SUCCESS, sentData.length * iterations);
runner.clearTransferState();
// Check that we have no unexpected extra data.
assertNull(recvQueue.poll());
}
private String[] createContent(final int size) {
final char[] content = new char[size];
for (int i = 0; i < size; i++) {
content[i] = CONTENT_CHAR;
}
return new String[] { new String(content) };
}
}