mirror of https://github.com/apache/nifi.git
NIFI-239: Updated deprecated methods and classes for GetHTTP
Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
002926882b
commit
5911342766
|
@ -20,8 +20,15 @@ import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.security.KeyManagementException;
|
||||||
|
import java.security.KeyStore;
|
||||||
|
import java.security.KeyStoreException;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.security.UnrecoverableKeyException;
|
||||||
|
import java.security.cert.CertificateException;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -42,6 +49,24 @@ import java.util.regex.Pattern;
|
||||||
|
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
|
|
||||||
|
import org.apache.http.Header;
|
||||||
|
import org.apache.http.HttpResponse;
|
||||||
|
import org.apache.http.auth.AuthScope;
|
||||||
|
import org.apache.http.auth.UsernamePasswordCredentials;
|
||||||
|
import org.apache.http.client.CredentialsProvider;
|
||||||
|
import org.apache.http.client.HttpClient;
|
||||||
|
import org.apache.http.client.config.RequestConfig;
|
||||||
|
import org.apache.http.client.methods.HttpGet;
|
||||||
|
import org.apache.http.config.Registry;
|
||||||
|
import org.apache.http.config.RegistryBuilder;
|
||||||
|
import org.apache.http.conn.HttpClientConnectionManager;
|
||||||
|
import org.apache.http.conn.socket.ConnectionSocketFactory;
|
||||||
|
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
|
||||||
|
import org.apache.http.conn.ssl.SSLContexts;
|
||||||
|
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
|
||||||
|
import org.apache.http.impl.client.BasicCredentialsProvider;
|
||||||
|
import org.apache.http.impl.client.HttpClientBuilder;
|
||||||
|
import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.ValidationContext;
|
import org.apache.nifi.components.ValidationContext;
|
||||||
import org.apache.nifi.components.ValidationResult;
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
@ -62,21 +87,6 @@ import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.ssl.SSLContextService;
|
import org.apache.nifi.ssl.SSLContextService;
|
||||||
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
|
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
|
||||||
import org.apache.nifi.util.StopWatch;
|
import org.apache.nifi.util.StopWatch;
|
||||||
import org.apache.http.Header;
|
|
||||||
import org.apache.http.HttpResponse;
|
|
||||||
import org.apache.http.auth.AuthScope;
|
|
||||||
import org.apache.http.auth.UsernamePasswordCredentials;
|
|
||||||
import org.apache.http.client.HttpClient;
|
|
||||||
import org.apache.http.client.methods.HttpGet;
|
|
||||||
import org.apache.http.client.params.ClientPNames;
|
|
||||||
import org.apache.http.conn.ClientConnectionManager;
|
|
||||||
import org.apache.http.conn.scheme.Scheme;
|
|
||||||
import org.apache.http.conn.ssl.SSLSocketFactory;
|
|
||||||
import org.apache.http.impl.client.DefaultHttpClient;
|
|
||||||
import org.apache.http.impl.conn.BasicClientConnectionManager;
|
|
||||||
import org.apache.http.params.BasicHttpParams;
|
|
||||||
import org.apache.http.params.HttpConnectionParams;
|
|
||||||
import org.apache.http.params.HttpParams;
|
|
||||||
|
|
||||||
@Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"})
|
@Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"})
|
||||||
@CapabilityDescription("Fetches a file via HTTP")
|
@CapabilityDescription("Fetches a file via HTTP")
|
||||||
|
@ -263,6 +273,28 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException,
|
||||||
|
CertificateException, KeyManagementException, UnrecoverableKeyException
|
||||||
|
{
|
||||||
|
final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType());
|
||||||
|
try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) {
|
||||||
|
truststore.load(in, service.getTrustStorePassword().toCharArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType());
|
||||||
|
try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) {
|
||||||
|
keystore.load(in, service.getKeyStorePassword().toCharArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
SSLContext sslContext = SSLContexts.custom()
|
||||||
|
.loadTrustMaterial(truststore, new TrustSelfSignedStrategy())
|
||||||
|
.loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
return sslContext;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
|
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
|
||||||
final ProcessorLog logger = getLogger();
|
final ProcessorLog logger = getLogger();
|
||||||
|
@ -274,6 +306,7 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
|
||||||
logger.warn("found FlowFile {} in input queue; transferring to success", new Object[]{incomingFlowFile});
|
logger.warn("found FlowFile {} in input queue; transferring to success", new Object[]{incomingFlowFile});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get the URL
|
||||||
final String url = context.getProperty(URL).getValue();
|
final String url = context.getProperty(URL).getValue();
|
||||||
final URI uri;
|
final URI uri;
|
||||||
String source = url;
|
String source = url;
|
||||||
|
@ -283,32 +316,75 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
|
||||||
} catch (URISyntaxException swallow) {
|
} catch (URISyntaxException swallow) {
|
||||||
// this won't happen as the url has already been validated
|
// this won't happen as the url has already been validated
|
||||||
}
|
}
|
||||||
final ClientConnectionManager conMan = createConnectionManager(context);
|
|
||||||
|
// get the ssl context service
|
||||||
|
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||||
|
|
||||||
|
// create the connection manager
|
||||||
|
final HttpClientConnectionManager conMan;
|
||||||
|
if ( sslContextService == null ) {
|
||||||
|
conMan = new BasicHttpClientConnectionManager();
|
||||||
|
} else {
|
||||||
|
final SSLContext sslContext;
|
||||||
try {
|
try {
|
||||||
final HttpParams httpParams = new BasicHttpParams();
|
sslContext = createSSLContext(sslContextService);
|
||||||
HttpConnectionParams.setConnectionTimeout(httpParams, context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS)
|
} catch (final Exception e) {
|
||||||
.intValue());
|
throw new ProcessException(e);
|
||||||
HttpConnectionParams.setSoTimeout(httpParams, context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
|
|
||||||
httpParams.setBooleanParameter(ClientPNames.HANDLE_REDIRECTS, context.getProperty(FOLLOW_REDIRECTS).asBoolean());
|
|
||||||
final String userAgent = context.getProperty(USER_AGENT).getValue();
|
|
||||||
if (userAgent != null) {
|
|
||||||
httpParams.setParameter("http.useragent", userAgent);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final HttpClient client = new DefaultHttpClient(conMan, httpParams);
|
final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new String[] { "TLSv1" }, null,
|
||||||
|
SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
|
||||||
|
|
||||||
|
final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
|
||||||
|
.register("https", sslsf).build();
|
||||||
|
|
||||||
|
conMan = new BasicHttpClientConnectionManager(socketFactoryRegistry);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// build the request configuration
|
||||||
|
final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
|
||||||
|
requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
|
||||||
|
requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
|
||||||
|
requestConfigBuilder.setRedirectsEnabled(false);
|
||||||
|
requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
|
||||||
|
requestConfigBuilder.setRedirectsEnabled(context.getProperty(FOLLOW_REDIRECTS).asBoolean());
|
||||||
|
|
||||||
|
// build the http client
|
||||||
|
final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
|
||||||
|
clientBuilder.setConnectionManager(conMan);
|
||||||
|
|
||||||
|
// include the user agent
|
||||||
|
final String userAgent = context.getProperty(USER_AGENT).getValue();
|
||||||
|
if (userAgent != null) {
|
||||||
|
clientBuilder.setUserAgent(userAgent);
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the ssl context if necessary
|
||||||
|
if (sslContextService != null) {
|
||||||
|
clientBuilder.setSslcontext(sslContextService.createSSLContext(ClientAuth.REQUIRED));
|
||||||
|
}
|
||||||
|
|
||||||
final String username = context.getProperty(USERNAME).getValue();
|
final String username = context.getProperty(USERNAME).getValue();
|
||||||
final String password = context.getProperty(PASSWORD).getValue();
|
final String password = context.getProperty(PASSWORD).getValue();
|
||||||
|
|
||||||
|
// set the credentials if appropriate
|
||||||
if (username != null) {
|
if (username != null) {
|
||||||
|
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
||||||
if (password == null) {
|
if (password == null) {
|
||||||
((DefaultHttpClient) client).getCredentialsProvider().setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username));
|
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username));
|
||||||
} else {
|
} else {
|
||||||
((DefaultHttpClient) client).getCredentialsProvider().setCredentials(AuthScope.ANY,
|
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
|
||||||
new UsernamePasswordCredentials(username, password));
|
|
||||||
}
|
}
|
||||||
|
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create the http client
|
||||||
|
final HttpClient client = clientBuilder.build();
|
||||||
|
|
||||||
|
// create request
|
||||||
final HttpGet get = new HttpGet(url);
|
final HttpGet get = new HttpGet(url);
|
||||||
|
get.setConfig(requestConfigBuilder.build());
|
||||||
|
|
||||||
get.addHeader(HEADER_IF_MODIFIED_SINCE, lastModifiedRef.get());
|
get.addHeader(HEADER_IF_MODIFIED_SINCE, lastModifiedRef.get());
|
||||||
get.addHeader(HEADER_IF_NONE_MATCH, entityTagRef.get());
|
get.addHeader(HEADER_IF_NONE_MATCH, entityTagRef.get());
|
||||||
|
@ -401,47 +477,4 @@ public class GetHTTP extends AbstractSessionFactoryProcessor {
|
||||||
conMan.shutdown();
|
conMan.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClientConnectionManager createConnectionManager(final ProcessContext processContext) {
|
|
||||||
final String url = processContext.getProperty(URL).getValue();
|
|
||||||
final boolean secure = (url.toLowerCase().startsWith("https"));
|
|
||||||
URI uriObject;
|
|
||||||
try {
|
|
||||||
uriObject = new URI(url);
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
throw new ProcessException(e); // will not happen because of our validators
|
|
||||||
}
|
|
||||||
int port = uriObject.getPort();
|
|
||||||
if (port == -1) {
|
|
||||||
port = 443;
|
|
||||||
}
|
|
||||||
|
|
||||||
final ClientConnectionManager conMan = new BasicClientConnectionManager();
|
|
||||||
if (secure) {
|
|
||||||
try {
|
|
||||||
final SSLContext context = createSslContext(processContext);
|
|
||||||
final SSLSocketFactory sslSocketFactory = new SSLSocketFactory(context);
|
|
||||||
final Scheme sslScheme = new Scheme("https", port, sslSocketFactory);
|
|
||||||
conMan.getSchemeRegistry().register(sslScheme);
|
|
||||||
} catch (final Exception e) {
|
|
||||||
getLogger().error("Unable to setup SSL connection due to ", e);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return conMan;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a SSL context based on the processor's optional properties.
|
|
||||||
* <p/>
|
|
||||||
*
|
|
||||||
* @return a SSLContext instance
|
|
||||||
* <p/>
|
|
||||||
* @throws ProcessingException if the context could not be created
|
|
||||||
*/
|
|
||||||
private SSLContext createSslContext(final ProcessContext context) {
|
|
||||||
final SSLContextService service = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
|
||||||
return (service == null) ? null : service.createSSLContext(ClientAuth.REQUIRED);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.ServletOutputStream;
|
||||||
|
import javax.servlet.http.HttpServlet;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import org.apache.nifi.stream.io.StreamUtils;
|
||||||
|
|
||||||
|
public class HelloWorldServlet extends HttpServlet {
|
||||||
|
private static final long serialVersionUID = -8821242726929583763L;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doGet(final HttpServletRequest req, final HttpServletResponse resp) throws ServletException, IOException {
|
||||||
|
final ServletOutputStream out = resp.getOutputStream();
|
||||||
|
try (final FileInputStream fis = new FileInputStream("src/test/resources/hello.txt")) {
|
||||||
|
StreamUtils.copy(fis, out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -16,24 +16,29 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
import org.apache.nifi.processors.standard.GetHTTP;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
import org.apache.nifi.ssl.SSLContextService;
|
||||||
|
import org.apache.nifi.ssl.StandardSSLContextService;
|
||||||
|
import org.apache.nifi.util.MockControllerServiceInitializationContext;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.MockProcessContext;
|
import org.apache.nifi.util.MockProcessContext;
|
||||||
import org.apache.nifi.util.MockProcessorInitializationContext;
|
import org.apache.nifi.util.MockProcessorInitializationContext;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
|
||||||
import org.eclipse.jetty.server.Server;
|
|
||||||
import org.eclipse.jetty.servlet.ServletHandler;
|
import org.eclipse.jetty.servlet.ServletHandler;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -73,9 +78,32 @@ public class TestGetHTTP {
|
||||||
assertTrue(confDir.delete());
|
assertTrue(confDir.delete());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Map<String, String> createSslProperties() {
|
||||||
|
Map<String, String> map = new HashMap<String, String>();
|
||||||
|
map.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks");
|
||||||
|
map.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest");
|
||||||
|
map.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
|
||||||
|
map.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks");
|
||||||
|
map.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest");
|
||||||
|
map.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public final void testContentModified() throws Exception {
|
public final void testContentModified() throws Exception {
|
||||||
String destination = "http://localhost:10203";
|
// set up web service
|
||||||
|
ServletHandler handler = new ServletHandler();
|
||||||
|
handler.addServletWithMapping(RESTServiceContentModified.class, "/*");
|
||||||
|
|
||||||
|
// create the service
|
||||||
|
TestServer server = new TestServer();
|
||||||
|
server.addHandler(handler);
|
||||||
|
|
||||||
|
try {
|
||||||
|
server.startServer();
|
||||||
|
|
||||||
|
// this is the base url with the random port
|
||||||
|
String destination = server.getUrl();
|
||||||
|
|
||||||
// set up NiFi mock controller
|
// set up NiFi mock controller
|
||||||
controller = TestRunners.newTestRunner(GetHTTP.class);
|
controller = TestRunners.newTestRunner(GetHTTP.class);
|
||||||
|
@ -83,13 +111,7 @@ public class TestGetHTTP {
|
||||||
controller.setProperty(GetHTTP.URL, destination);
|
controller.setProperty(GetHTTP.URL, destination);
|
||||||
controller.setProperty(GetHTTP.FILENAME, "testFile");
|
controller.setProperty(GetHTTP.FILENAME, "testFile");
|
||||||
controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
|
controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
|
||||||
// set up web service
|
|
||||||
Server server = new Server(10203);
|
|
||||||
ServletHandler handler = new ServletHandler();
|
|
||||||
server.setHandler(handler);
|
|
||||||
handler.addServletWithMapping(RESTServiceContentModified.class, "/*");
|
|
||||||
try {
|
|
||||||
server.start();
|
|
||||||
GetHTTP getHTTPProcessor = (GetHTTP) controller.getProcessor();
|
GetHTTP getHTTPProcessor = (GetHTTP) controller.getProcessor();
|
||||||
assertEquals("", getHTTPProcessor.entityTagRef.get());
|
assertEquals("", getHTTPProcessor.entityTagRef.get());
|
||||||
assertEquals("Thu, 01 Jan 1970 00:00:00 GMT", getHTTPProcessor.lastModifiedRef.get());
|
assertEquals("Thu, 01 Jan 1970 00:00:00 GMT", getHTTPProcessor.lastModifiedRef.get());
|
||||||
|
@ -150,8 +172,7 @@ public class TestGetHTTP {
|
||||||
|
|
||||||
// shutdown web service
|
// shutdown web service
|
||||||
} finally {
|
} finally {
|
||||||
server.stop();
|
server.shutdownServer();
|
||||||
server.destroy();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,20 +184,30 @@ public class TestGetHTTP {
|
||||||
for (File file : files) {
|
for (File file : files) {
|
||||||
assertTrue("Failed to delete " + file.getName(), file.delete());
|
assertTrue("Failed to delete " + file.getName(), file.delete());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set up web service
|
||||||
|
ServletHandler handler = new ServletHandler();
|
||||||
|
handler.addServletWithMapping(RESTServiceContentModified.class, "/*");
|
||||||
|
|
||||||
|
// create the service
|
||||||
|
TestServer server = new TestServer();
|
||||||
|
server.addHandler(handler);
|
||||||
|
|
||||||
|
try {
|
||||||
|
server.startServer();
|
||||||
|
|
||||||
|
// get the server url
|
||||||
|
String destination = server.getUrl();
|
||||||
|
|
||||||
// set up NiFi mock controller
|
// set up NiFi mock controller
|
||||||
controller = TestRunners.newTestRunner(GetHTTP.class);
|
controller = TestRunners.newTestRunner(GetHTTP.class);
|
||||||
controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs");
|
controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs");
|
||||||
controller.setProperty(GetHTTP.FILENAME, "testFile");
|
controller.setProperty(GetHTTP.FILENAME, "testFile");
|
||||||
String destination = "http://localhost:10203";
|
|
||||||
controller.setProperty(GetHTTP.URL, destination);
|
controller.setProperty(GetHTTP.URL, destination);
|
||||||
controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
|
controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
|
||||||
Server server = new Server(10203);
|
|
||||||
ServletHandler handler = new ServletHandler();
|
|
||||||
server.setHandler(handler);
|
|
||||||
handler.addServletWithMapping(RESTServiceContentModified.class, "/*");
|
|
||||||
try {
|
|
||||||
server.start();
|
|
||||||
GetHTTP getHTTPProcessor = (GetHTTP) controller.getProcessor();
|
GetHTTP getHTTPProcessor = (GetHTTP) controller.getProcessor();
|
||||||
|
|
||||||
assertEquals("", getHTTPProcessor.entityTagRef.get());
|
assertEquals("", getHTTPProcessor.entityTagRef.get());
|
||||||
assertEquals("Thu, 01 Jan 1970 00:00:00 GMT", getHTTPProcessor.lastModifiedRef.get());
|
assertEquals("Thu, 01 Jan 1970 00:00:00 GMT", getHTTPProcessor.lastModifiedRef.get());
|
||||||
controller.run(2);
|
controller.run(2);
|
||||||
|
@ -224,14 +255,24 @@ public class TestGetHTTP {
|
||||||
|
|
||||||
// shutdown web service
|
// shutdown web service
|
||||||
} finally {
|
} finally {
|
||||||
server.stop();
|
server.shutdownServer();
|
||||||
server.destroy();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public final void testUserAgent() throws Exception {
|
public final void testUserAgent() throws Exception {
|
||||||
String destination = "http://localhost:10203";
|
// set up web service
|
||||||
|
ServletHandler handler = new ServletHandler();
|
||||||
|
handler.addServletWithMapping(UserAgentTestingServlet.class, "/*");
|
||||||
|
|
||||||
|
// create the service
|
||||||
|
TestServer server = new TestServer();
|
||||||
|
server.addHandler(handler);
|
||||||
|
|
||||||
|
try {
|
||||||
|
server.startServer();
|
||||||
|
|
||||||
|
String destination = server.getUrl();
|
||||||
|
|
||||||
// set up NiFi mock controller
|
// set up NiFi mock controller
|
||||||
controller = TestRunners.newTestRunner(GetHTTP.class);
|
controller = TestRunners.newTestRunner(GetHTTP.class);
|
||||||
|
@ -239,13 +280,7 @@ public class TestGetHTTP {
|
||||||
controller.setProperty(GetHTTP.URL, destination);
|
controller.setProperty(GetHTTP.URL, destination);
|
||||||
controller.setProperty(GetHTTP.FILENAME, "testFile");
|
controller.setProperty(GetHTTP.FILENAME, "testFile");
|
||||||
controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
|
controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
|
||||||
// set up web service
|
|
||||||
Server server = new Server(10203);
|
|
||||||
ServletHandler handler = new ServletHandler();
|
|
||||||
server.setHandler(handler);
|
|
||||||
handler.addServletWithMapping(UserAgentTestingServlet.class, "/*");
|
|
||||||
try {
|
|
||||||
server.start();
|
|
||||||
controller.run();
|
controller.run();
|
||||||
controller.assertTransferCount(GetHTTP.REL_SUCCESS, 0);
|
controller.assertTransferCount(GetHTTP.REL_SUCCESS, 0);
|
||||||
|
|
||||||
|
@ -255,8 +290,65 @@ public class TestGetHTTP {
|
||||||
|
|
||||||
// shutdown web service
|
// shutdown web service
|
||||||
} finally {
|
} finally {
|
||||||
server.stop();
|
server.shutdownServer();
|
||||||
server.destroy();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<String, String> getSslProperties() {
|
||||||
|
Map<String, String> props = new HashMap<String, String>();
|
||||||
|
props.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks");
|
||||||
|
props.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest");
|
||||||
|
props.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
|
||||||
|
props.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks");
|
||||||
|
props.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest");
|
||||||
|
props.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
|
||||||
|
return props;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void useSSLContextService() {
|
||||||
|
final SSLContextService service = new StandardSSLContextService();
|
||||||
|
try {
|
||||||
|
controller.addControllerService("ssl-service", service, getSslProperties());
|
||||||
|
} catch (InitializationException ex) {
|
||||||
|
ex.printStackTrace();
|
||||||
|
Assert.fail("Could not create SSL Context Service");
|
||||||
|
}
|
||||||
|
|
||||||
|
controller.setProperty(GetHTTP.SSL_CONTEXT_SERVICE, "ssl-service");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public final void testSecure() throws Exception {
|
||||||
|
// set up web service
|
||||||
|
ServletHandler handler = new ServletHandler();
|
||||||
|
handler.addServletWithMapping(HelloWorldServlet.class, "/*");
|
||||||
|
|
||||||
|
// create the service
|
||||||
|
TestServer server = new TestServer(getSslProperties());
|
||||||
|
server.addHandler(handler);
|
||||||
|
|
||||||
|
try {
|
||||||
|
server.startServer();
|
||||||
|
|
||||||
|
String destination = server.getSecureUrl();
|
||||||
|
|
||||||
|
// set up NiFi mock controller
|
||||||
|
controller = TestRunners.newTestRunner(GetHTTP.class);
|
||||||
|
useSSLContextService();
|
||||||
|
|
||||||
|
controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs");
|
||||||
|
controller.setProperty(GetHTTP.URL, destination);
|
||||||
|
controller.setProperty(GetHTTP.FILENAME, "testFile");
|
||||||
|
controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, "application/json");
|
||||||
|
|
||||||
|
controller.run();
|
||||||
|
controller.assertAllFlowFilesTransferred(GetHTTP.REL_SUCCESS, 1);
|
||||||
|
final MockFlowFile mff = controller.getFlowFilesForRelationship(GetHTTP.REL_SUCCESS).get(0);
|
||||||
|
mff.assertContentEquals("Hello, World!");
|
||||||
|
} finally {
|
||||||
|
server.shutdownServer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
import org.apache.nifi.processors.standard.InvokeHTTP;
|
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
@ -42,11 +41,7 @@ import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.eclipse.jetty.server.Handler;
|
import org.eclipse.jetty.server.Handler;
|
||||||
import org.eclipse.jetty.server.Request;
|
import org.eclipse.jetty.server.Request;
|
||||||
import org.eclipse.jetty.server.Server;
|
|
||||||
import org.eclipse.jetty.server.ServerConnector;
|
|
||||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||||
import org.eclipse.jetty.server.handler.HandlerCollection;
|
|
||||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -57,7 +52,7 @@ import org.junit.Test;
|
||||||
public class TestInvokeHTTP {
|
public class TestInvokeHTTP {
|
||||||
|
|
||||||
private static Map<String, String> sslProperties;
|
private static Map<String, String> sslProperties;
|
||||||
private static Server server;
|
private static TestServer server;
|
||||||
private static String url;
|
private static String url;
|
||||||
|
|
||||||
private TestRunner runner;
|
private TestRunner runner;
|
||||||
|
@ -74,19 +69,15 @@ public class TestInvokeHTTP {
|
||||||
|
|
||||||
// create a Jetty server on a random port
|
// create a Jetty server on a random port
|
||||||
server = createServer();
|
server = createServer();
|
||||||
server.start();
|
server.startServer();
|
||||||
|
|
||||||
// we need the port to construct the base url
|
|
||||||
int port = ((ServerConnector) server.getConnectors()[0]).getLocalPort();
|
|
||||||
|
|
||||||
// this is the base url with the random port
|
// this is the base url with the random port
|
||||||
url = "https://localhost:" + port;
|
url = server.getSecureUrl();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void afterClass() throws Exception {
|
public static void afterClass() throws Exception {
|
||||||
server.stop();
|
server.shutdownServer();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -95,13 +86,7 @@ public class TestInvokeHTTP {
|
||||||
runner.addControllerService("ssl-context", new StandardSSLContextService(), sslProperties);
|
runner.addControllerService("ssl-context", new StandardSSLContextService(), sslProperties);
|
||||||
runner.setProperty(Config.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
|
runner.setProperty(Config.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
|
||||||
|
|
||||||
HandlerCollection hc = (HandlerCollection) server.getHandler();
|
server.clearHandlers();
|
||||||
Handler[] ha = hc.getHandlers();
|
|
||||||
if (ha != null) {
|
|
||||||
for (Handler h : ha) {
|
|
||||||
hc.removeHandler(h);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -110,7 +95,7 @@ public class TestInvokeHTTP {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addHandler(Handler handler) {
|
private void addHandler(Handler handler) {
|
||||||
((HandlerCollection) server.getHandler()).addHandler(handler);
|
server.addHandler(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -537,23 +522,8 @@ public class TestInvokeHTTP {
|
||||||
return map;
|
return map;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Server createServer() throws IOException {
|
private static TestServer createServer() throws IOException {
|
||||||
SslContextFactory ssl = new SslContextFactory();
|
return new TestServer(sslProperties);
|
||||||
ssl.setKeyStorePath(sslProperties.get(StandardSSLContextService.KEYSTORE.getName()));
|
|
||||||
ssl.setKeyStorePassword(sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()));
|
|
||||||
ssl.setKeyStoreType(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName()));
|
|
||||||
ssl.setTrustStorePath(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()));
|
|
||||||
ssl.setTrustStorePassword(sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()));
|
|
||||||
ssl.setTrustStoreType(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName()));
|
|
||||||
|
|
||||||
ssl.setNeedClientAuth(true);
|
|
||||||
|
|
||||||
Server server = new Server();
|
|
||||||
|
|
||||||
server.addConnector(new ServerConnector(server, ssl));
|
|
||||||
server.setHandler(new HandlerCollection(true));
|
|
||||||
|
|
||||||
return server;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException {
|
private static void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException {
|
||||||
|
|
|
@ -0,0 +1,177 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.nifi.ssl.StandardSSLContextService;
|
||||||
|
import org.eclipse.jetty.server.Handler;
|
||||||
|
import org.eclipse.jetty.server.Server;
|
||||||
|
import org.eclipse.jetty.server.ServerConnector;
|
||||||
|
import org.eclipse.jetty.server.handler.HandlerCollection;
|
||||||
|
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test server to assist with unit tests that requires a server to be stood up.
|
||||||
|
*/
|
||||||
|
public class TestServer {
|
||||||
|
|
||||||
|
private Server jetty;
|
||||||
|
private boolean secure = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the test server.
|
||||||
|
*/
|
||||||
|
public TestServer() {
|
||||||
|
createServer(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the test server.
|
||||||
|
*
|
||||||
|
* @param sslProperties SSLProps to be used in the secure connection. The keys
|
||||||
|
* should should use the StandardSSLContextService properties.
|
||||||
|
*/
|
||||||
|
public TestServer(final Map<String, String> sslProperties) {
|
||||||
|
createServer(sslProperties);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the server.
|
||||||
|
*
|
||||||
|
* @param webappContext
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private void createServer(final Map<String, String> sslProperties) {
|
||||||
|
jetty = new Server();
|
||||||
|
|
||||||
|
// create the unsecure connector
|
||||||
|
createConnector();
|
||||||
|
|
||||||
|
// create the secure connector if sslProperties are specified
|
||||||
|
if (sslProperties != null) {
|
||||||
|
createSecureConnector(sslProperties);
|
||||||
|
}
|
||||||
|
|
||||||
|
jetty.setHandler(new HandlerCollection(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the http connection
|
||||||
|
*/
|
||||||
|
private void createConnector() {
|
||||||
|
final ServerConnector http = new ServerConnector(jetty);
|
||||||
|
http.setPort(0);
|
||||||
|
jetty.addConnector(http);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the https connector.
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
private void createSecureConnector(final Map<String, String> sslProperties) {
|
||||||
|
SslContextFactory ssl = new SslContextFactory();
|
||||||
|
ssl.setKeyStorePath(sslProperties.get(StandardSSLContextService.KEYSTORE.getName()));
|
||||||
|
ssl.setKeyStorePassword(sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName()));
|
||||||
|
ssl.setKeyStoreType(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName()));
|
||||||
|
ssl.setTrustStorePath(sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()));
|
||||||
|
ssl.setTrustStorePassword(sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName()));
|
||||||
|
ssl.setTrustStoreType(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName()));
|
||||||
|
ssl.setNeedClientAuth(true);
|
||||||
|
|
||||||
|
// build the connector
|
||||||
|
final ServerConnector https = new ServerConnector(jetty, ssl);
|
||||||
|
|
||||||
|
// set host and port
|
||||||
|
https.setPort(0);
|
||||||
|
|
||||||
|
// add the connector
|
||||||
|
jetty.addConnector(https);
|
||||||
|
|
||||||
|
// mark secure as enabled
|
||||||
|
secure = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clearHandlers() {
|
||||||
|
HandlerCollection hc = (HandlerCollection) jetty.getHandler();
|
||||||
|
Handler[] ha = hc.getHandlers();
|
||||||
|
if (ha != null) {
|
||||||
|
for (Handler h : ha) {
|
||||||
|
hc.removeHandler(h);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addHandler(Handler handler) {
|
||||||
|
((HandlerCollection) jetty.getHandler()).addHandler(handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts the server.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public void startServer() throws Exception {
|
||||||
|
jetty.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops the server.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public void shutdownServer() throws Exception {
|
||||||
|
jetty.stop();
|
||||||
|
jetty.destroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getPort() {
|
||||||
|
if (!jetty.isStarted()) {
|
||||||
|
throw new IllegalStateException("Jetty server not started");
|
||||||
|
}
|
||||||
|
return ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getSecurePort() {
|
||||||
|
if (!jetty.isStarted()) {
|
||||||
|
throw new IllegalStateException("Jetty server not started");
|
||||||
|
}
|
||||||
|
return ((ServerConnector) jetty.getConnectors()[1]).getLocalPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the url for the server.
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public String getUrl() {
|
||||||
|
return "http://localhost:" + getPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the secure url for the server.
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public String getSecureUrl() {
|
||||||
|
String url = null;
|
||||||
|
if (secure) {
|
||||||
|
url = "https://localhost:" + getSecurePort();
|
||||||
|
}
|
||||||
|
return url;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue