From d99180a7160ac8a189507168dc97a35e0501eef9 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 9 Jan 2015 13:18:00 -0500 Subject: [PATCH] NIFI-239: Updated PostHTTP to not use deprecated methods and classes in Apache HTTP Client --- .../nifi/processors/standard/PostHTTP.java | 227 +++++++++++------- 1 file changed, 136 insertions(+), 91 deletions(-) diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java index 14f8a28cf8..ee16610e37 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java @@ -16,12 +16,18 @@ */ package org.apache.nifi.processors.standard; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,20 +45,45 @@ import java.util.regex.Pattern; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; import javax.security.cert.X509Certificate; import javax.servlet.http.HttpServletResponse; +import org.apache.http.Header; +import org.apache.http.HttpException; +import org.apache.http.HttpResponse; +import org.apache.http.HttpResponseInterceptor; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.client.HttpClient; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpHead; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.HttpClientConnectionManager; +import org.apache.http.conn.ManagedHttpClientConnection; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.conn.ssl.SSLContexts; +import org.apache.http.conn.ssl.TrustSelfSignedStrategy; +import org.apache.http.entity.ContentProducer; +import org.apache.http.entity.EntityTemplate; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.protocol.HttpContext; +import org.apache.http.protocol.HttpCoreContext; +import org.apache.http.util.EntityUtils; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.stream.io.GZIPOutputStream; -import org.apache.nifi.stream.io.LeakyBucketStreamThrottler; -import org.apache.nifi.stream.io.StreamThrottler; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; @@ -69,7 +100,12 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.ssl.SSLContextService.ClientAuth; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.stream.io.GZIPOutputStream; +import org.apache.nifi.stream.io.LeakyBucketStreamThrottler; +import org.apache.nifi.stream.io.StreamThrottler; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.FlowFilePackager; import org.apache.nifi.util.FlowFilePackagerV1; import org.apache.nifi.util.FlowFilePackagerV2; @@ -78,31 +114,6 @@ import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.StopWatch; -import org.apache.http.Header; -import org.apache.http.HttpException; -import org.apache.http.HttpResponse; -import org.apache.http.HttpResponseInterceptor; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpDelete; -import org.apache.http.client.methods.HttpHead; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.params.ClientPNames; -import org.apache.http.conn.ClientConnectionManager; -import org.apache.http.conn.HttpRoutedConnection; -import org.apache.http.conn.scheme.Scheme; -import org.apache.http.conn.ssl.SSLSocketFactory; -import org.apache.http.entity.ContentProducer; -import org.apache.http.entity.EntityTemplate; -import org.apache.http.impl.client.AbstractHttpClient; -import org.apache.http.impl.client.DefaultHttpClient; -import org.apache.http.impl.conn.PoolingClientConnectionManager; -import org.apache.http.params.BasicHttpParams; -import org.apache.http.params.HttpConnectionParams; -import org.apache.http.params.HttpParams; -import org.apache.http.protocol.ExecutionContext; -import org.apache.http.protocol.HttpContext; -import org.apache.http.util.EntityUtils; - import com.sun.jersey.api.client.ClientResponse.Status; @SupportsBatching @@ -308,28 +319,64 @@ public class PostHTTP extends AbstractProcessor { return config; } - final ClientConnectionManager conMan = new PoolingClientConnectionManager(); - registerUrlWithManager(url, context, conMan); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final SSLContext sslContext; + try { + sslContext = createSSLContext(sslContextService); + } catch (final Exception e) { + throw new ProcessException(e); + } + + final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new String[] { "TLSv1" }, null, + SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER); + + final Registry socketFactoryRegistry = RegistryBuilder.create() + .register("https", sslsf).build(); + + final PoolingHttpClientConnectionManager conMan = new PoolingHttpClientConnectionManager(socketFactoryRegistry); + conMan.setDefaultMaxPerRoute(context.getMaxConcurrentTasks()); + conMan.setMaxTotal(context.getMaxConcurrentTasks()); config = new Config(conMan); final Config existingConfig = configMap.putIfAbsent(baseUrl, config); return (existingConfig == null) ? config : existingConfig; } + + + private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException, + CertificateException, KeyManagementException, UnrecoverableKeyException + { + final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType()); + try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) { + truststore.load(in, service.getTrustStorePassword().toCharArray()); + } + + final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType()); + try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) { + keystore.load(in, service.getKeyStorePassword().toCharArray()); + } + + SSLContext sslContext = SSLContexts.custom() + .loadTrustMaterial(truststore, new TrustSelfSignedStrategy()) + .loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()) + .build(); + + return sslContext; + } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { final boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean(); final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger(); - - final HttpParams httpParams = new BasicHttpParams(); - HttpConnectionParams.setConnectionTimeout(httpParams, context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - HttpConnectionParams.setSoTimeout(httpParams, context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); - httpParams.setBooleanParameter(ClientPNames.HANDLE_REDIRECTS, false); final String userAgent = context.getProperty(USER_AGENT).getValue(); - if (userAgent != null) { - httpParams.setParameter("http.useragent", userAgent); - } + final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); + requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + requestConfigBuilder.setRedirectsEnabled(false); + requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + final RequestConfig requestConfig = requestConfigBuilder.build(); + final StreamThrottler throttler = throttlerRef.get(); final ProcessorLog logger = getLogger(); @@ -339,7 +386,7 @@ public class PostHTTP extends AbstractProcessor { final List toSend = new ArrayList<>(); DestinationAccepts destinationAccepts = null; - HttpClient client = null; + CloseableHttpClient client = null; final String transactionId = UUID.randomUUID().toString(); final ObjectHolder dnHolder = new ObjectHolder<>("none"); @@ -371,15 +418,20 @@ public class PostHTTP extends AbstractProcessor { if (client == null || destinationAccepts == null) { final Config config = getConfig(url, context); - final ClientConnectionManager conMan = config.getConnectionManager(); - client = new DefaultHttpClient(conMan, httpParams); - - ((AbstractHttpClient) client).addResponseInterceptor(new HttpResponseInterceptor() { + final HttpClientConnectionManager conMan = config.getConnectionManager(); + + final HttpClientBuilder clientBuilder = HttpClientBuilder.create(); + clientBuilder.setConnectionManager(conMan); + clientBuilder.setUserAgent(userAgent); + clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() { @Override - public void process(final HttpResponse response, final HttpContext context) throws HttpException, IOException { - final HttpRoutedConnection httpRoutedConnection = (HttpRoutedConnection) context.getAttribute(ExecutionContext.HTTP_CONNECTION); - if (httpRoutedConnection.isSecure()) { - final X509Certificate[] certChain = httpRoutedConnection.getSSLSession().getPeerCertificateChain(); + public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException { + HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext); + ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class); + SSLSession sslSession = conn.getSSLSession(); + + if ( sslSession != null ) { + final X509Certificate[] certChain = sslSession.getPeerCertificateChain(); if (certChain == null || certChain.length == 0) { throw new SSLPeerUnverifiedException("No certificates found"); } @@ -389,6 +441,23 @@ public class PostHTTP extends AbstractProcessor { } } }); + + clientBuilder.disableAutomaticRetries(); + clientBuilder.disableContentCompression(); + + final String username = context.getProperty(USERNAME).getValue(); + final String password = context.getProperty(PASSWORD).getValue(); + // set the credentials if appropriate + if (username != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + if (password == null) { + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username)); + } else { + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + }; + clientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + client = clientBuilder.build(); // determine whether or not destination accepts flowfile/gzip destinationAccepts = config.getDestinationAccepts(); @@ -491,7 +560,8 @@ public class PostHTTP extends AbstractProcessor { entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean()); post.setEntity(entity); - + post.setConfig(requestConfig); + final String contentType; if (sendAsFlowFile) { if (accepts.isFlowFileV3Accepted()) { @@ -537,7 +607,7 @@ public class PostHTTP extends AbstractProcessor { final String uploadDataRate; final long uploadMillis; - final HttpResponse response; + CloseableHttpResponse response = null; try { final StopWatch stopWatch = new StopWatch(true); response = client.execute(post); @@ -556,6 +626,14 @@ public class PostHTTP extends AbstractProcessor { session.transfer(flowFile, REL_FAILURE); } return; + } finally { + if ( response != null ) { + try { + response.close(); + } catch (IOException e) { + getLogger().warn("Failed to close HTTP Response due to {}", new Object[] {e}); + } + } } // If we get a 'SEE OTHER' status code and an HTTP header that indicates that the intent @@ -686,39 +764,6 @@ public class PostHTTP extends AbstractProcessor { } } - private void registerUrlWithManager(final String url, final ProcessContext processContext, final ClientConnectionManager conMan) { - URI uriObject; - try { - uriObject = new URI(url); - } catch (URISyntaxException e) { - throw new ProcessException(e); // won't happen because of our - } - int port = uriObject.getPort(); - if (port == -1) { - port = 443; - } - - final boolean secure = (url.toLowerCase().startsWith("https")); - if (!secure) { - return; - } - - final SSLContext sslContext = createSslContext(processContext); - final SSLSocketFactory sslSocketFactory = new SSLSocketFactory(sslContext); - final Scheme newScheme = new Scheme("https", port, sslSocketFactory); - conMan.getSchemeRegistry().register(newScheme); - } - - /** - * Creates a SSL context based on the processor's optional properties. - *

- * - * @return an SSLContext instance - */ - private SSLContext createSslContext(final ProcessContext context) { - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - return (sslContextService == null) ? null : sslContextService.createSSLContext(ClientAuth.REQUIRED); - } private DestinationAccepts getDestinationAcceptance(final HttpClient client, final String uri, final ProcessorLog logger, final String transactionId) throws IOException { final HttpHead head = new HttpHead(uri); @@ -838,9 +883,9 @@ public class PostHTTP extends AbstractProcessor { private static class Config { private volatile DestinationAccepts destinationAccepts; - private final ClientConnectionManager conMan; + private final HttpClientConnectionManager conMan; - public Config(final ClientConnectionManager conMan) { + public Config(final HttpClientConnectionManager conMan) { this.conMan = conMan; } @@ -852,7 +897,7 @@ public class PostHTTP extends AbstractProcessor { this.destinationAccepts = destinationAccepts; } - public ClientConnectionManager getConnectionManager() { + public HttpClientConnectionManager getConnectionManager() { return conMan; } }