diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 63d1c9f49a..f3fd3e81e5 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -66,6 +66,8 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.reporting.InitializationException; import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class StandardProcessorTestRunner implements TestRunner { @@ -80,6 +82,7 @@ public class StandardProcessorTestRunner implements TestRunner { private int numThreads = 1; private final AtomicInteger invocations = new AtomicInteger(0); + private static final Logger logger = LoggerFactory.getLogger(StandardProcessorTestRunner.class); private static final Set> deprecatedTypeAnnotations = new HashSet<>(); private static final Set> deprecatedMethodAnnotations = new HashSet<>(); @@ -134,14 +137,14 @@ public class StandardProcessorTestRunner implements TestRunner { private static void detectDeprecatedAnnotations(final Processor processor) { for ( final Class annotationClass : deprecatedTypeAnnotations ) { if ( processor.getClass().isAnnotationPresent(annotationClass) ) { - Assert.fail("Processor is using deprecated Annotation " + annotationClass.getCanonicalName()); + logger.warn("Processor is using deprecated Annotation " + annotationClass.getCanonicalName()); } } for ( final Class annotationClass : deprecatedMethodAnnotations ) { for ( final Method method : processor.getClass().getMethods() ) { if ( method.isAnnotationPresent(annotationClass) ) { - Assert.fail("Processor is using deprecated Annotation " + annotationClass.getCanonicalName() + " for method " + method); + logger.warn("Processor is using deprecated Annotation " + annotationClass.getCanonicalName() + " for method " + method); } } } diff --git a/nifi/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java b/nifi/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java index 37bcf2337a..a561982479 100644 --- a/nifi/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java +++ b/nifi/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java @@ -20,8 +20,10 @@ import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; +import org.junit.Ignore; import org.junit.Test; +@Ignore("This should not be enabled until we actually fail processor unit tests for using deprecated methods, which should happen in 0.1.0") public class TestStandardProcessorTestRunner { @Test(expected=AssertionError.class) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java index fd486b0efd..f8a33bc8e7 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java @@ -68,6 +68,7 @@ import org.apache.http.conn.HttpClientConnectionManager; import org.apache.http.conn.ManagedHttpClientConnection; import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.conn.ssl.SSLContextBuilder; import org.apache.http.conn.ssl.SSLContexts; import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.entity.ContentProducer; @@ -352,21 +353,26 @@ public class PostHTTP extends AbstractProcessor { private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, KeyManagementException, UnrecoverableKeyException { - final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType()); - try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) { - truststore.load(in, service.getTrustStorePassword().toCharArray()); + SSLContextBuilder builder = SSLContexts.custom(); + final String trustFilename = service.getTrustStoreFile(); + if ( trustFilename != null ) { + final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType()); + try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) { + truststore.load(in, service.getTrustStorePassword().toCharArray()); + } + builder = builder.loadTrustMaterial(truststore, new TrustSelfSignedStrategy()); + } + + final String keyFilename = service.getKeyStoreFile(); + if ( keyFilename != null ) { + final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType()); + try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) { + keystore.load(in, service.getKeyStorePassword().toCharArray()); + } + builder = builder.loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()); } - final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType()); - try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) { - keystore.load(in, service.getKeyStorePassword().toCharArray()); - } - - SSLContext sslContext = SSLContexts.custom() - .loadTrustMaterial(truststore, new TrustSelfSignedStrategy()) - .loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()) - .build(); - + SSLContext sslContext = builder.build(); return sslContext; } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java new file mode 100644 index 0000000000..cf4f609c32 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/CaptureServlet.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.Response.Status; + +import org.apache.activemq.util.ByteArrayOutputStream; +import org.apache.nifi.stream.io.StreamUtils; + +public class CaptureServlet extends HttpServlet { + private static final long serialVersionUID = 8402271018449653919L; + + private volatile byte[] lastPost; + + public byte[] getLastPost() { + return lastPost; + } + + @Override + protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + StreamUtils.copy(request.getInputStream(), baos); + this.lastPost = baos.toByteArray(); + + response.setStatus(Status.OK.getStatusCode()); + } + + @Override + protected void doHead(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { + response.setHeader("Accept", "application/flowfile-v3,application/flowfile-v2"); + response.setHeader("x-nifi-transfer-protocol-version", "1"); + response.setHeader("Accept-Encoding", "gzip"); + } +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java new file mode 100644 index 0000000000..dcda136774 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPostHTTP.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.util.FlowFileUnpackagerV3; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.eclipse.jetty.servlet.ServletHandler; +import org.junit.After; +import org.junit.Test; + +public class TestPostHTTP { + private TestServer server; + private TestRunner runner; + private CaptureServlet servlet; + + private void setup(final Map sslProperties) throws Exception { + // set up web service + ServletHandler handler = new ServletHandler(); + handler.addServletWithMapping(CaptureServlet.class, "/*"); + servlet = (CaptureServlet) handler.getServlets()[0].getServlet(); + + // create the service + server = new TestServer(sslProperties); + server.addHandler(handler); + server.startServer(); + + runner = TestRunners.newTestRunner(PostHTTP.class); + } + + @After + public void cleanup() throws Exception { + if (server != null) { + server.shutdownServer(); + server = null; + } + } + + @Test + public void testTruststoreSSLOnly() throws Exception { + final Map sslProps = new HashMap<>(); + sslProps.put(TestServer.NEED_CLIENT_AUTH, "false"); + sslProps.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + sslProps.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + sslProps.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + setup(sslProps); + + final SSLContextService sslContextService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslContextService); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + runner.enableControllerService(sslContextService); + + runner.setProperty(PostHTTP.URL, server.getSecureUrl()); + runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); + + runner.enqueue("Hello world".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS, 1); + } + + + @Test + public void testTwoWaySSL() throws Exception { + final Map sslProps = new HashMap<>(); + sslProps.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + sslProps.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + sslProps.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + sslProps.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + sslProps.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + sslProps.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); + sslProps.put(TestServer.NEED_CLIENT_AUTH, "true"); + setup(sslProps); + + final SSLContextService sslContextService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslContextService); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); + runner.enableControllerService(sslContextService); + + runner.setProperty(PostHTTP.URL, server.getSecureUrl()); + runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); + + runner.enqueue("Hello world".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS, 1); + } + + @Test + public void testOneWaySSLWhenServerConfiguredForTwoWay() throws Exception { + final Map sslProps = new HashMap<>(); + sslProps.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + sslProps.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + sslProps.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + sslProps.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + sslProps.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + sslProps.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); + sslProps.put(TestServer.NEED_CLIENT_AUTH, "true"); + setup(sslProps); + + final SSLContextService sslContextService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslContextService); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + runner.enableControllerService(sslContextService); + + runner.setProperty(PostHTTP.URL, server.getSecureUrl()); + runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); + + runner.enqueue("Hello world".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PostHTTP.REL_FAILURE, 1); + } + + @Test + public void testSendAsFlowFile() throws Exception { + setup(null); + runner.setProperty(PostHTTP.URL, server.getUrl()); + runner.setProperty(PostHTTP.SEND_AS_FLOWFILE, "true"); + + final Map attrs = new HashMap<>(); + attrs.put("abc", "cba"); + + runner.enqueue("Hello".getBytes(), attrs); + attrs.put("abc", "abc"); + attrs.put("filename", "xyz.txt"); + runner.enqueue("World".getBytes(), attrs); + + runner.run(1); + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); + + final byte[] lastPost = servlet.getLastPost(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final ByteArrayInputStream bais = new ByteArrayInputStream(lastPost); + + FlowFileUnpackagerV3 unpacker = new FlowFileUnpackagerV3(); + + // unpack first flowfile received + Map receivedAttrs = unpacker.unpackageFlowFile(bais, baos); + byte[] contentReceived = baos.toByteArray(); + assertEquals("Hello", new String(contentReceived)); + assertEquals("cba", receivedAttrs.get("abc")); + + assertTrue( unpacker.hasMoreData() ); + + baos.reset(); + receivedAttrs = unpacker.unpackageFlowFile(bais, baos); + contentReceived = baos.toByteArray(); + + assertEquals("World", new String(contentReceived)); + assertEquals("abc", receivedAttrs.get("abc")); + assertEquals("xyz.txt", receivedAttrs.get("filename")); + } + + + @Test + public void testSendAsFlowFileSecure() throws Exception { + final Map sslProps = new HashMap<>(); + sslProps.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + sslProps.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + sslProps.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + sslProps.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + sslProps.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + sslProps.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); + sslProps.put(TestServer.NEED_CLIENT_AUTH, "true"); + setup(sslProps); + + final SSLContextService sslContextService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslContextService); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); + runner.enableControllerService(sslContextService); + + runner.setProperty(PostHTTP.URL, server.getSecureUrl()); + runner.setProperty(PostHTTP.SEND_AS_FLOWFILE, "true"); + runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); + + final Map attrs = new HashMap<>(); + attrs.put("abc", "cba"); + + runner.enqueue("Hello".getBytes(), attrs); + attrs.put("abc", "abc"); + attrs.put("filename", "xyz.txt"); + runner.enqueue("World".getBytes(), attrs); + + runner.run(1); + runner.assertAllFlowFilesTransferred(PostHTTP.REL_SUCCESS); + + final byte[] lastPost = servlet.getLastPost(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final ByteArrayInputStream bais = new ByteArrayInputStream(lastPost); + + FlowFileUnpackagerV3 unpacker = new FlowFileUnpackagerV3(); + + // unpack first flowfile received + Map receivedAttrs = unpacker.unpackageFlowFile(bais, baos); + byte[] contentReceived = baos.toByteArray(); + assertEquals("Hello", new String(contentReceived)); + assertEquals("cba", receivedAttrs.get("abc")); + + assertTrue( unpacker.hasMoreData() ); + + baos.reset(); + receivedAttrs = unpacker.unpackageFlowFile(bais, baos); + contentReceived = baos.toByteArray(); + + assertEquals("World", new String(contentReceived)); + assertEquals("abc", receivedAttrs.get("abc")); + assertEquals("xyz.txt", receivedAttrs.get("filename")); + } + +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestServer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestServer.java index 53b79e642b..abdff6e6be 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestServer.java @@ -28,6 +28,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory; * Test server to assist with unit tests that requires a server to be stood up. */ public class TestServer { + public static final String NEED_CLIENT_AUTH = "clientAuth"; private Server jetty; private boolean secure = false; @@ -85,13 +86,25 @@ public class TestServer { */ private void createSecureConnector(final Map sslProperties) { SslContextFactory ssl = new SslContextFactory(); - ssl.setKeyStorePath(sslProperties.get(StandardSSLContextService.KEYSTORE.getName())); - ssl.setKeyStorePassword(sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName())); - ssl.setKeyStoreType(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName())); - ssl.setTrustStorePath(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName())); - ssl.setTrustStorePassword(sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName())); - ssl.setTrustStoreType(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName())); - ssl.setNeedClientAuth(true); + + if ( sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) != null ) { + ssl.setKeyStorePath(sslProperties.get(StandardSSLContextService.KEYSTORE.getName())); + ssl.setKeyStorePassword(sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName())); + ssl.setKeyStoreType(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName())); + } + + if ( sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null ) { + ssl.setTrustStorePath(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName())); + ssl.setTrustStorePassword(sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName())); + ssl.setTrustStoreType(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName())); + } + + final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH); + if ( clientAuth == null ) { + ssl.setNeedClientAuth(true); + } else { + ssl.setNeedClientAuth(Boolean.parseBoolean(clientAuth)); + } // build the connector final ServerConnector https = new ServerConnector(jetty, ssl);