NIFI-8304 This closes #4907. Replaced HttpsURLConnection with OkHttpClient in TestListenHTTP

NIFI-8304 Updated TestPutTCP to shutdown server before checking connections
NIFI-8304 Changed TestListenTCP to send messages in one byte array
NIFI-8304 Added check for expected jdk.tls.disabledAlgorithms
This commit is contained in:
exceptionfactory 2021-03-17 07:58:36 -05:00 committed by Joe Witt
parent cf4e966d91
commit 8202bffc98
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
5 changed files with 137 additions and 136 deletions

View File

@ -24,7 +24,6 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.KeyStoreUtils; import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.StandardTlsConfiguration; import org.apache.nifi.security.util.StandardTlsConfiguration;
import org.apache.nifi.security.util.TlsConfiguration; import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsException; import org.apache.nifi.security.util.TlsException;
@ -33,6 +32,7 @@ import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -820,10 +820,10 @@ public class InvokeHTTPTest {
private void setSslContextConfiguration(final TlsConfiguration serverTlsConfiguration, final TlsConfiguration clientTlsConfiguration) throws InitializationException, TlsException { private void setSslContextConfiguration(final TlsConfiguration serverTlsConfiguration, final TlsConfiguration clientTlsConfiguration) throws InitializationException, TlsException {
final SSLContextService sslContextService = setSslContextService(); final SSLContextService sslContextService = setSslContextService();
final SSLContext serverSslContext = getSslContext(serverTlsConfiguration); final SSLContext serverSslContext = SslContextUtils.createSslContext(serverTlsConfiguration);
setMockWebServerSslSocketFactory(serverSslContext); setMockWebServerSslSocketFactory(serverSslContext);
final SSLContext clientSslContext = getSslContext(clientTlsConfiguration); final SSLContext clientSslContext = SslContextUtils.createSslContext(clientTlsConfiguration);
when(sslContextService.createContext()).thenReturn(clientSslContext); when(sslContextService.createContext()).thenReturn(clientSslContext);
when(sslContextService.createTlsConfiguration()).thenReturn(clientTlsConfiguration); when(sslContextService.createTlsConfiguration()).thenReturn(clientTlsConfiguration);
} }
@ -848,12 +848,4 @@ public class InvokeHTTPTest {
} }
mockWebServer.useHttps(sslSocketFactory, false); mockWebServer.useHttps(sslSocketFactory, false);
} }
private SSLContext getSslContext(final TlsConfiguration configuration) throws TlsException {
final SSLContext sslContext = SslContextFactory.createSslContext(configuration);
if (sslContext == null) {
throw new IllegalArgumentException("SSLContext not found for TLS Configuration");
}
return sslContext;
}
} }

View File

@ -16,25 +16,20 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import com.google.common.base.Charsets;
import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.URL; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException; import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.SSLHandshakeException;
@ -42,6 +37,7 @@ import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import okhttp3.MediaType; import okhttp3.MediaType;
import okhttp3.MultipartBody; import okhttp3.MultipartBody;
@ -49,7 +45,6 @@ import okhttp3.OkHttpClient;
import okhttp3.Request; import okhttp3.Request;
import okhttp3.RequestBody; import okhttp3.RequestBody;
import okhttp3.Response; import okhttp3.Response;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessSessionFactory;
@ -64,6 +59,7 @@ import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.thread.ThreadPool; import org.eclipse.jetty.util.thread.ThreadPool;
import org.junit.After; import org.junit.After;
@ -83,7 +79,7 @@ public class TestListenHTTP {
private static final String SSL_CONTEXT_SERVICE_IDENTIFIER = "ssl-context"; private static final String SSL_CONTEXT_SERVICE_IDENTIFIER = "ssl-context";
private static final String HTTP_POST_METHOD = "POST"; private static final MediaType APPLICATION_OCTET_STREAM = MediaType.get("application/octet-stream");
private static final String HTTP_BASE_PATH = "basePath"; private static final String HTTP_BASE_PATH = "basePath";
private final static String PORT_VARIABLE = "HTTP_PORT"; private final static String PORT_VARIABLE = "HTTP_PORT";
@ -99,6 +95,7 @@ public class TestListenHTTP {
private static final int SOCKET_CONNECT_TIMEOUT = 100; private static final int SOCKET_CONNECT_TIMEOUT = 100;
private static final long SERVER_START_TIMEOUT = 1200000; private static final long SERVER_START_TIMEOUT = 1200000;
private static final Duration CLIENT_CALL_TIMEOUT = Duration.ofSeconds(10);
private static TlsConfiguration tlsConfiguration; private static TlsConfiguration tlsConfiguration;
private static TlsConfiguration serverConfiguration; private static TlsConfiguration serverConfiguration;
@ -108,6 +105,7 @@ public class TestListenHTTP {
private static SSLContext serverKeyStoreNoTrustStoreSslContext; private static SSLContext serverKeyStoreNoTrustStoreSslContext;
private static SSLContext keyStoreSslContext; private static SSLContext keyStoreSslContext;
private static SSLContext trustStoreSslContext; private static SSLContext trustStoreSslContext;
private static X509TrustManager trustManager;
private ListenHTTP proc; private ListenHTTP proc;
private TestRunner runner; private TestRunner runner;
@ -150,11 +148,11 @@ public class TestListenHTTP {
TLS_1_2 TLS_1_2
); );
serverKeyStoreSslContext = SslContextFactory.createSslContext(serverConfiguration); serverKeyStoreSslContext = SslContextUtils.createSslContext(serverConfiguration);
final TrustManager[] defaultTrustManagers = SslContextFactory.getTrustManagers(serverNoTruststoreConfiguration); trustManager = SslContextFactory.getX509TrustManager(serverConfiguration);
serverKeyStoreNoTrustStoreSslContext = SslContextFactory.createSslContext(serverNoTruststoreConfiguration, defaultTrustManagers); serverKeyStoreNoTrustStoreSslContext = SslContextFactory.createSslContext(serverNoTruststoreConfiguration, new TrustManager[]{trustManager});
keyStoreSslContext = SslContextFactory.createSslContext(new StandardTlsConfiguration( keyStoreSslContext = SslContextUtils.createSslContext(new StandardTlsConfiguration(
tlsConfiguration.getKeystorePath(), tlsConfiguration.getKeystorePath(),
tlsConfiguration.getKeystorePassword(), tlsConfiguration.getKeystorePassword(),
tlsConfiguration.getKeystoreType(), tlsConfiguration.getKeystoreType(),
@ -162,7 +160,7 @@ public class TestListenHTTP {
tlsConfiguration.getTruststorePassword(), tlsConfiguration.getTruststorePassword(),
tlsConfiguration.getTruststoreType()) tlsConfiguration.getTruststoreType())
); );
trustStoreSslContext = SslContextFactory.createSslContext(new StandardTlsConfiguration( trustStoreSslContext = SslContextUtils.createSslContext(new StandardTlsConfiguration(
null, null,
null, null,
null, null,
@ -354,21 +352,15 @@ public class TestListenHTTP {
public void testSecureServerTrustStoreConfiguredClientAuthenticationRequired() throws Exception { public void testSecureServerTrustStoreConfiguredClientAuthenticationRequired() throws Exception {
configureProcessorSslContextService(ListenHTTP.ClientAuthentication.REQUIRED, serverConfiguration); configureProcessorSslContextService(ListenHTTP.ClientAuthentication.REQUIRED, serverConfiguration);
startSecureServer(); startSecureServer();
final HttpsURLConnection connection = getSecureConnection(trustStoreSslContext); assertThrows(SSLException.class, () -> postMessage(null, true, false));
assertThrows(SSLException.class, connection::getResponseCode);
final HttpsURLConnection clientCertificateConnection = getSecureConnection(keyStoreSslContext);
final int responseCode = clientCertificateConnection.getResponseCode();
assertEquals(HttpServletResponse.SC_METHOD_NOT_ALLOWED, responseCode);
} }
@Test @Test
public void testSecureServerTrustStoreNotConfiguredClientAuthenticationNotRequired() throws Exception { public void testSecureServerTrustStoreNotConfiguredClientAuthenticationNotRequired() throws Exception {
configureProcessorSslContextService(ListenHTTP.ClientAuthentication.AUTO, serverNoTruststoreConfiguration); configureProcessorSslContextService(ListenHTTP.ClientAuthentication.AUTO, serverNoTruststoreConfiguration);
startSecureServer(); startSecureServer();
final HttpsURLConnection connection = getSecureConnection(trustStoreSslContext); final int responseCode = postMessage(null, true, true);
final int responseCode = connection.getResponseCode(); assertEquals(HttpServletResponse.SC_NO_CONTENT, responseCode);
assertEquals(HttpServletResponse.SC_METHOD_NOT_ALLOWED, responseCode);
} }
@Test @Test
@ -462,37 +454,34 @@ public class TestListenHTTP {
startWebServer(); startWebServer();
} }
private HttpsURLConnection getSecureConnection(final SSLContext sslContext) throws Exception {
final URL url = new URL(buildUrl(true));
final HttpsURLConnection connection = (HttpsURLConnection) url.openConnection();
connection.setSSLSocketFactory(sslContext.getSocketFactory());
return connection;
}
private int postMessage(String message, boolean secure, boolean clientAuthRequired) throws Exception { private int postMessage(String message, boolean secure, boolean clientAuthRequired) throws Exception {
String endpointUrl = buildUrl(secure); final OkHttpClient okHttpClient = getOkHttpClient(secure, clientAuthRequired);
final URL url = new URL(endpointUrl); final Request.Builder requestBuilder = new Request.Builder();
final HttpURLConnection connection = (HttpURLConnection) url.openConnection(); final String url = buildUrl(secure);
requestBuilder.url(url);
if (connection instanceof HttpsURLConnection) { final byte[] bytes = message == null ? new byte[]{} : message.getBytes(StandardCharsets.UTF_8);
final HttpsURLConnection httpsConnection = (HttpsURLConnection) connection; final RequestBody requestBody = RequestBody.create(bytes, APPLICATION_OCTET_STREAM);
final Request request = requestBuilder.post(requestBody).build();
try (final Response response = okHttpClient.newCall(request).execute()) {
return response.code();
}
}
private OkHttpClient getOkHttpClient(final boolean secure, final boolean clientAuthRequired) {
final OkHttpClient.Builder builder = new OkHttpClient.Builder();
if (secure) {
if (clientAuthRequired) { if (clientAuthRequired) {
httpsConnection.setSSLSocketFactory(keyStoreSslContext.getSocketFactory()); builder.sslSocketFactory(keyStoreSslContext.getSocketFactory(), trustManager);
} else { } else {
httpsConnection.setSSLSocketFactory(trustStoreSslContext.getSocketFactory()); builder.sslSocketFactory(trustStoreSslContext.getSocketFactory(), trustManager);
} }
} }
connection.setRequestMethod(HTTP_POST_METHOD);
connection.setDoOutput(true);
final DataOutputStream wr = new DataOutputStream(connection.getOutputStream()); builder.callTimeout(CLIENT_CALL_TIMEOUT);
if (message != null) { return builder.build();
wr.writeBytes(message);
}
wr.flush();
wr.close();
return connection.getResponseCode();
} }
private String buildUrl(final boolean secure) { private String buildUrl(final boolean secure) {
@ -603,20 +592,13 @@ public class TestListenHTTP {
.post(multipartBody) .post(multipartBody)
.build(); .build();
int timeout = 3000; final OkHttpClient client = getOkHttpClient(false, false);
OkHttpClient client = new OkHttpClient.Builder()
.readTimeout(timeout, TimeUnit.MILLISECONDS)
.writeTimeout(timeout, TimeUnit.MILLISECONDS)
.build();
try (Response response = client.newCall(request).execute()) { try (Response response = client.newCall(request).execute()) {
Files.deleteIfExists(Paths.get(String.valueOf(file1))); Files.deleteIfExists(Paths.get(String.valueOf(file1)));
Files.deleteIfExists(Paths.get(String.valueOf(file2))); Files.deleteIfExists(Paths.get(String.valueOf(file2)));
Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body()), response.isSuccessful()); Assert.assertTrue(String.format("Unexpected code: %s, body: %s", response.code(), response.body()), response.isSuccessful());
} }
runner.assertAllFlowFilesTransferred(ListenHTTP.RELATIONSHIP_SUCCESS, 5); runner.assertAllFlowFilesTransferred(ListenHTTP.RELATIONSHIP_SUCCESS, 5);
List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(ListenHTTP.RELATIONSHIP_SUCCESS); List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(ListenHTTP.RELATIONSHIP_SUCCESS);
// Part fragments are not processed in the order we submitted them. // Part fragments are not processed in the order we submitted them.
@ -673,9 +655,7 @@ public class TestListenHTTP {
final File textFile = Files.createTempFile(TestListenHTTP.class.getSimpleName(), ".txt").toFile(); final File textFile = Files.createTempFile(TestListenHTTP.class.getSimpleName(), ".txt").toFile();
textFile.deleteOnExit(); textFile.deleteOnExit();
try (FileOutputStream fos = new FileOutputStream(textFile)) { Files.write(textFile.toPath(), Arrays.asList(lines));
IOUtils.writeLines(Arrays.asList(lines), System.lineSeparator(), fos, Charsets.UTF_8);
}
return textFile; return textFile;
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.remote.io.socket.NetworkUtils; import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.ClientAuth; import org.apache.nifi.security.util.ClientAuth;
@ -26,7 +27,6 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils; import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -76,7 +76,7 @@ public class TestListenTCP {
} }
@Test @Test
public void testListenTCP() throws IOException { public void testRun() throws IOException {
final List<String> messages = new ArrayList<>(); final List<String> messages = new ArrayList<>();
messages.add("This is message 1\n"); messages.add("This is message 1\n");
messages.add("This is message 2\n"); messages.add("This is message 2\n");
@ -93,7 +93,7 @@ public class TestListenTCP {
} }
@Test @Test
public void testListenTCPBatching() throws IOException { public void testRunBatching() throws IOException {
runner.setProperty(ListenTCP.MAX_BATCH_SIZE, "3"); runner.setProperty(ListenTCP.MAX_BATCH_SIZE, "3");
final List<String> messages = new ArrayList<>(); final List<String> messages = new ArrayList<>();
@ -115,7 +115,7 @@ public class TestListenTCP {
} }
@Test @Test
public void testTLSClientAuthRequiredAndClientCertProvided() throws IOException, InitializationException { public void testRunClientAuthRequired() throws IOException, InitializationException {
runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.REQUIRED.name()); runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.REQUIRED.name());
enableSslContextService(keyStoreSslContext); enableSslContextService(keyStoreSslContext);
@ -135,24 +135,7 @@ public class TestListenTCP {
} }
@Test @Test
public void testTLSClientAuthRequiredAndClientCertNotProvided() throws InitializationException { public void testRunClientAuthNone() throws IOException, InitializationException {
runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.REQUIRED.name());
enableSslContextService(keyStoreSslContext);
final List<String> messages = new ArrayList<>();
messages.add("This is message 1\n");
messages.add("This is message 2\n");
messages.add("This is message 3\n");
messages.add("This is message 4\n");
messages.add("This is message 5\n");
Assert.assertThrows(IOException.class, () ->
run(messages, messages.size(), trustStoreSslContext)
);
}
@Test
public void testTLSClientAuthNoneAndClientCertNotProvided() throws IOException, InitializationException {
runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.NONE.name()); runner.setProperty(ListenTCP.CLIENT_AUTH, ClientAuth.NONE.name());
enableSslContextService(keyStoreSslContext); enableSslContextService(keyStoreSslContext);
@ -180,11 +163,11 @@ public class TestListenTCP {
// Run Processor and start Dispatcher without shutting down // Run Processor and start Dispatcher without shutting down
runner.run(1, false, true); runner.run(1, false, true);
final String message = StringUtils.join(messages, null);
final byte[] bytes = message.getBytes(StandardCharsets.UTF_8);
try (final Socket socket = getSocket(port, sslContext)) { try (final Socket socket = getSocket(port, sslContext)) {
final OutputStream outputStream = socket.getOutputStream(); final OutputStream outputStream = socket.getOutputStream();
for (final String message : messages) { outputStream.write(bytes);
outputStream.write(message.getBytes(StandardCharsets.UTF_8));
}
outputStream.flush(); outputStream.flush();
// Run Processor for number of responses // Run Processor for number of responses

View File

@ -20,11 +20,11 @@ package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.processors.standard.util.TCPTestServer; import org.apache.nifi.processors.standard.util.TCPTestServer;
import org.apache.nifi.security.util.KeyStoreUtils; import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.TlsConfiguration; import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -34,8 +34,6 @@ import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocketFactory; import javax.net.ssl.SSLServerSocketFactory;
import java.net.InetAddress; import java.net.InetAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
@ -82,7 +80,7 @@ public class TestPutTCP {
@After @After
public void cleanup() { public void cleanup() {
runner.shutdown(); runner.shutdown();
removeTestServer(server); shutdownServer();
} }
@Test @Test
@ -106,15 +104,14 @@ public class TestPutTCP {
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false); configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
sendTestData(VALID_FILES); sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES); assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1); assertServerConnections(1);
} }
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD) @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testRunSuccessSslContextService() throws Exception { public void testRunSuccessSslContextService() throws Exception {
final TlsConfiguration tlsConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore(); final TlsConfiguration tlsConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore();
try { final SSLContext sslContext = SslContextUtils.createSslContext(tlsConfiguration);
final SSLContext sslContext = SslContextFactory.createSslContext(tlsConfiguration);
assertNotNull("SSLContext not found", sslContext); assertNotNull("SSLContext not found", sslContext);
final String identifier = SSLContextService.class.getName(); final String identifier = SSLContextService.class.getName();
@ -130,11 +127,7 @@ public class TestPutTCP {
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false); configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
sendTestData(VALID_FILES); sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES); assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1); assertServerConnections(1);
} finally {
Files.deleteIfExists(Paths.get(tlsConfiguration.getKeystorePath()));
Files.deleteIfExists(Paths.get(tlsConfiguration.getTruststorePath()));
}
} }
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
@ -143,7 +136,7 @@ public class TestPutTCP {
configureProperties(TCP_SERVER_ADDRESS_EL, OUTGOING_MESSAGE_DELIMITER, false); configureProperties(TCP_SERVER_ADDRESS_EL, OUTGOING_MESSAGE_DELIMITER, false);
sendTestData(VALID_FILES); sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES); assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1); assertServerConnections(1);
} }
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
@ -153,14 +146,14 @@ public class TestPutTCP {
sendTestData(VALID_FILES); sendTestData(VALID_FILES);
assertTransfers(VALID_FILES.length); assertTransfers(VALID_FILES.length);
assertMessagesReceived(VALID_FILES); assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
runner.setProperty(PutTCP.IDLE_EXPIRATION, "500 ms"); runner.setProperty(PutTCP.IDLE_EXPIRATION, "500 ms");
Thread.sleep(1000); Thread.sleep(1000);
runner.run(1, false, false); runner.run(1, false, false);
runner.clearTransferState(); runner.clearTransferState();
sendTestData(VALID_FILES); sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES); assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections after prune senders not matched", server.getTotalNumConnections(), 2); assertServerConnections(2);
} }
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
@ -169,7 +162,7 @@ public class TestPutTCP {
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false); configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false);
sendTestData(VALID_FILES); sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES); assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1); assertServerConnections(1);
} }
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD) @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
@ -178,7 +171,7 @@ public class TestPutTCP {
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true); configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
sendTestData(VALID_FILES); sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES); assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), VALID_FILES.length); assertServerConnections(VALID_FILES.length);
} }
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
@ -187,19 +180,17 @@ public class TestPutTCP {
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false); configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
sendTestData(VALID_FILES); sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES); assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1);
removeTestServer(server); shutdownServer();
runner.clearTransferState();
sendTestData(VALID_FILES); sendTestData(VALID_FILES);
Thread.sleep(500); Thread.sleep(500);
assertNull("Unexpected Data Received", received.poll());
runner.assertQueueEmpty(); runner.assertQueueEmpty();
assertEquals("Server Connections after restart not matched", server.getTotalNumConnections(), 1);
createTestServer(OUTGOING_MESSAGE_DELIMITER); createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false); configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
sendTestData(VALID_FILES); sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES); assertMessagesReceived(VALID_FILES);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1); assertServerConnections(1);
} }
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
@ -207,9 +198,9 @@ public class TestPutTCP {
createTestServer(OUTGOING_MESSAGE_DELIMITER); createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false); configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
sendTestData(EMPTY_FILE); sendTestData(EMPTY_FILE);
assertTransfers(EMPTY_FILE.length); assertTransfers(1);
runner.assertQueueEmpty(); runner.assertQueueEmpty();
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1); assertServerConnections(1);
} }
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD) @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
@ -219,7 +210,7 @@ public class TestPutTCP {
final String[] testData = createContent(VALID_LARGE_FILE_SIZE); final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
sendTestData(testData); sendTestData(testData);
assertMessagesReceived(testData); assertMessagesReceived(testData);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), testData.length); assertServerConnections(testData.length);
} }
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD) @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
@ -230,7 +221,7 @@ public class TestPutTCP {
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false); configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT); sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
assertMessagesReceived(testData, LOAD_TEST_ITERATIONS); assertMessagesReceived(testData, LOAD_TEST_ITERATIONS);
assertEquals("Server Connections not matched", server.getTotalNumConnections(), 1); assertServerConnections(1);
} }
private void createTestServer(final String delimiter) throws Exception { private void createTestServer(final String delimiter) throws Exception {
@ -247,7 +238,7 @@ public class TestPutTCP {
port = server.getPort(); port = server.getPort();
} }
private void removeTestServer(final TCPTestServer server) { private void shutdownServer() {
if (server != null) { if (server != null) {
server.shutdown(); server.shutdown();
} }
@ -291,9 +282,9 @@ public class TestPutTCP {
private void assertMessagesReceived(final String[] sentData, final int iterations) throws Exception { private void assertMessagesReceived(final String[] sentData, final int iterations) throws Exception {
for (int i = 0; i < iterations; i++) { for (int i = 0; i < iterations; i++) {
for (String item : sentData) { for (String item : sentData) {
List<Byte> message = received.take(); final List<Byte> message = received.take();
assertNotNull(String.format("Message [%d] not found", i), message); assertNotNull(String.format("Message [%d] not found", i), message);
Byte[] messageBytes = new Byte[message.size()]; final Byte[] messageBytes = new Byte[message.size()];
assertArrayEquals(item.getBytes(), ArrayUtils.toPrimitive(message.toArray(messageBytes))); assertArrayEquals(item.getBytes(), ArrayUtils.toPrimitive(message.toArray(messageBytes)));
} }
} }
@ -304,6 +295,12 @@ public class TestPutTCP {
assertNull("Unexpected Message Found", received.poll()); assertNull("Unexpected Message Found", received.poll());
} }
private void assertServerConnections(final int connections) {
// Shutdown server to get completed number of connections
shutdownServer();
assertEquals("Server Connections not matched", server.getTotalNumConnections(), connections);
}
private String[] createContent(final int size) { private String[] createContent(final int size) {
final char[] content = new char[size]; final char[] content = new char[size];

View File

@ -16,15 +16,37 @@
*/ */
package org.apache.nifi.web.util.ssl; package org.apache.nifi.web.util.ssl;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.security.util.KeystoreType; import org.apache.nifi.security.util.KeystoreType;
import org.apache.nifi.security.util.SslContextFactory; import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.StandardTlsConfiguration; import org.apache.nifi.security.util.StandardTlsConfiguration;
import org.apache.nifi.security.util.TlsConfiguration; import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsException; import org.apache.nifi.security.util.TlsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import java.io.File;
import java.security.Security;
public class SslContextUtils { public class SslContextUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(SslContextUtils.class);
private static final String TLS_DISABLED_ALGORITHMS_PROPERTY = "jdk.tls.disabledAlgorithms";
private static final String DISABLED_ALGORITHMS = "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon, NULL, include jdk.disabled.namedCurves";
static {
final String disabledAlgorithms = Security.getProperty(TLS_DISABLED_ALGORITHMS_PROPERTY);
if (DISABLED_ALGORITHMS.equals(disabledAlgorithms)) {
LOGGER.debug("Found Expected Default TLS Disabled Algorithms: {}", DISABLED_ALGORITHMS);
} else {
LOGGER.warn("Found System Default TLS Disabled Algorithms: {}", disabledAlgorithms);
LOGGER.warn("Setting TLS Disabled Algorithms: {}", DISABLED_ALGORITHMS);
Security.setProperty(TLS_DISABLED_ALGORITHMS_PROPERTY, DISABLED_ALGORITHMS);
}
}
private static final String KEYSTORE_PATH = "src/test/resources/keystore.jks"; private static final String KEYSTORE_PATH = "src/test/resources/keystore.jks";
private static final String KEYSTORE_AND_TRUSTSTORE_PASSWORD = "passwordpassword"; private static final String KEYSTORE_AND_TRUSTSTORE_PASSWORD = "passwordpassword";
@ -72,4 +94,31 @@ public class SslContextUtils {
public static SSLContext createTrustStoreSslContext() throws TlsException { public static SSLContext createTrustStoreSslContext() throws TlsException {
return SslContextFactory.createSslContext(TRUSTSTORE_TLS_CONFIGURATION); return SslContextFactory.createSslContext(TRUSTSTORE_TLS_CONFIGURATION);
} }
/**
* Create SSLContext using Keystore and Truststore with deleteOnExit() for files
*
* @param tlsConfiguration TLS Configuration
* @return SSLContext configured with generated Keystore and Truststore
* @throws TlsException Thrown on SslContextFactory.createSslContext()
*/
public static SSLContext createSslContext(final TlsConfiguration tlsConfiguration) throws TlsException {
final String keystorePath = tlsConfiguration.getKeystorePath();
if (StringUtils.isNotBlank(keystorePath)) {
final File keystoreFile = new File(keystorePath);
keystoreFile.deleteOnExit();
}
final String truststorePath = tlsConfiguration.getTruststorePath();
if (StringUtils.isNotBlank(truststorePath)) {
final File truststoreFile = new File(truststorePath);
truststoreFile.deleteOnExit();
}
final SSLContext sslContext = SslContextFactory.createSslContext(tlsConfiguration);
if (sslContext == null) {
throw new TlsException(String.format("Failed to create SSLContext from Configuration %s", tlsConfiguration));
}
return sslContext;
}
} }