mirror of
https://github.com/apache/nifi.git
synced 2025-02-07 18:48:51 +00:00
NIFI-9346 Added closing of EventSender to TestListenRELP
Signed-off-by: Nathan Gough <thenatog@gmail.com> This closes #5492.
This commit is contained in:
parent
9865ea2bfb
commit
16e6045d13
@ -20,6 +20,8 @@ import org.apache.commons.lang3.StringUtils;
|
|||||||
import org.apache.ftpserver.ssl.ClientAuth;
|
import org.apache.ftpserver.ssl.ClientAuth;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
import org.apache.nifi.event.transport.EventSender;
|
import org.apache.nifi.event.transport.EventSender;
|
||||||
|
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
|
||||||
|
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
|
||||||
import org.apache.nifi.event.transport.configuration.TransportProtocol;
|
import org.apache.nifi.event.transport.configuration.TransportProtocol;
|
||||||
import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
|
import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
@ -31,8 +33,6 @@ import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
|
|||||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||||
import org.apache.nifi.provenance.ProvenanceEventType;
|
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||||
import org.apache.nifi.remote.io.socket.NetworkUtils;
|
import org.apache.nifi.remote.io.socket.NetworkUtils;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
|
||||||
import org.apache.nifi.security.util.TlsException;
|
|
||||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
||||||
import org.apache.nifi.ssl.SSLContextService;
|
import org.apache.nifi.ssl.SSLContextService;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
@ -109,7 +109,7 @@ public class TestListenRELP {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRELPFramesAreReceivedSuccessfully() throws IOException {
|
public void testRELPFramesAreReceivedSuccessfully() throws Exception {
|
||||||
final int relpFrames = 5;
|
final int relpFrames = 5;
|
||||||
final List<RELPFrame> frames = getFrames(relpFrames);
|
final List<RELPFrame> frames = getFrames(relpFrames);
|
||||||
|
|
||||||
@ -135,7 +135,7 @@ public class TestListenRELP {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRELPFramesAreReceivedSuccessfullyWhenBatched() throws IOException {
|
public void testRELPFramesAreReceivedSuccessfullyWhenBatched() throws Exception {
|
||||||
|
|
||||||
runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "5");
|
runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "5");
|
||||||
|
|
||||||
@ -164,9 +164,7 @@ public class TestListenRELP {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRunMutualTls() throws IOException, TlsException, InitializationException {
|
public void testRunMutualTls() throws Exception {
|
||||||
|
|
||||||
|
|
||||||
final String serviceIdentifier = SSLContextService.class.getName();
|
final String serviceIdentifier = SSLContextService.class.getName();
|
||||||
when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
|
when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
|
||||||
final SSLContext sslContext = SslContextUtils.createKeyStoreSslContext();
|
final SSLContext sslContext = SslContextUtils.createKeyStoreSslContext();
|
||||||
@ -206,9 +204,7 @@ public class TestListenRELP {
|
|||||||
runner.shutdown();
|
runner.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void run(final List<RELPFrame> frames, final int flowFiles, final SSLContext sslContext)
|
private void run(final List<RELPFrame> frames, final int flowFiles, final SSLContext sslContext) throws Exception {
|
||||||
throws IOException {
|
|
||||||
|
|
||||||
final int port = NetworkUtils.availablePort();
|
final int port = NetworkUtils.availablePort();
|
||||||
runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(port));
|
runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(port));
|
||||||
// Run Processor and start Dispatcher without shutting down
|
// Run Processor and start Dispatcher without shutting down
|
||||||
@ -243,18 +239,21 @@ public class TestListenRELP {
|
|||||||
return frames;
|
return frames;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendMessages(final int port, final byte[] relpMessages, final SSLContext sslContext) {
|
private void sendMessages(final int port, final byte[] relpMessages, final SSLContext sslContext) throws Exception {
|
||||||
final ByteArrayNettyEventSenderFactory eventSenderFactory = new ByteArrayNettyEventSenderFactory(runner.getLogger(), LOCALHOST, port, TransportProtocol.TCP);
|
final ByteArrayNettyEventSenderFactory eventSenderFactory = new ByteArrayNettyEventSenderFactory(runner.getLogger(), LOCALHOST, port, TransportProtocol.TCP);
|
||||||
|
eventSenderFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
|
||||||
|
eventSenderFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
|
||||||
if (sslContext != null) {
|
if (sslContext != null) {
|
||||||
eventSenderFactory.setSslContext(sslContext);
|
eventSenderFactory.setSslContext(sslContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
eventSenderFactory.setTimeout(SENDER_TIMEOUT);
|
eventSenderFactory.setTimeout(SENDER_TIMEOUT);
|
||||||
EventSender<byte[]> eventSender = eventSenderFactory.getEventSender();
|
try (final EventSender<byte[]> eventSender = eventSenderFactory.getEventSender()) {
|
||||||
eventSender.sendEvent(relpMessages);
|
eventSender.sendEvent(relpMessages);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class MockListenRELP extends ListenRELP {
|
private static class MockListenRELP extends ListenRELP {
|
||||||
private final List<RELPMessage> mockEvents;
|
private final List<RELPMessage> mockEvents;
|
||||||
|
|
||||||
public MockListenRELP() {
|
public MockListenRELP() {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user