NIFI-239: Updated PostHTTP to not use deprecated methods and classes in Apache HTTP Client

This commit is contained in:
Mark Payne 2015-01-09 13:18:00 -05:00
parent 76ea1c64ff
commit d99180a716
1 changed files with 136 additions and 91 deletions

View File

@ -16,12 +16,18 @@
*/ */
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URI; import java.security.KeyManagementException;
import java.net.URISyntaxException; import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -39,20 +45,45 @@ import java.util.regex.Pattern;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.security.cert.X509Certificate; import javax.security.cert.X509Certificate;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.apache.http.Header;
import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.ManagedHttpClientConnection;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLContexts;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.entity.ContentProducer;
import org.apache.http.entity.EntityTemplate;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpCoreContext;
import org.apache.http.util.EntityUtils;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
import org.apache.nifi.stream.io.StreamThrottler;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
@ -69,7 +100,12 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth; import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
import org.apache.nifi.stream.io.StreamThrottler;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FlowFilePackager; import org.apache.nifi.util.FlowFilePackager;
import org.apache.nifi.util.FlowFilePackagerV1; import org.apache.nifi.util.FlowFilePackagerV1;
import org.apache.nifi.util.FlowFilePackagerV2; import org.apache.nifi.util.FlowFilePackagerV2;
@ -78,31 +114,6 @@ import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import org.apache.http.Header;
import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.params.ClientPNames;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.HttpRoutedConnection;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.http.entity.ContentProducer;
import org.apache.http.entity.EntityTemplate;
import org.apache.http.impl.client.AbstractHttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import com.sun.jersey.api.client.ClientResponse.Status; import com.sun.jersey.api.client.ClientResponse.Status;
@SupportsBatching @SupportsBatching
@ -308,28 +319,64 @@ public class PostHTTP extends AbstractProcessor {
return config; return config;
} }
final ClientConnectionManager conMan = new PoolingClientConnectionManager(); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
registerUrlWithManager(url, context, conMan); final SSLContext sslContext;
try {
sslContext = createSSLContext(sslContextService);
} catch (final Exception e) {
throw new ProcessException(e);
}
final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new String[] { "TLSv1" }, null,
SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("https", sslsf).build();
final PoolingHttpClientConnectionManager conMan = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
conMan.setDefaultMaxPerRoute(context.getMaxConcurrentTasks());
conMan.setMaxTotal(context.getMaxConcurrentTasks());
config = new Config(conMan); config = new Config(conMan);
final Config existingConfig = configMap.putIfAbsent(baseUrl, config); final Config existingConfig = configMap.putIfAbsent(baseUrl, config);
return (existingConfig == null) ? config : existingConfig; return (existingConfig == null) ? config : existingConfig;
} }
private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException,
CertificateException, KeyManagementException, UnrecoverableKeyException
{
final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType());
try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) {
truststore.load(in, service.getTrustStorePassword().toCharArray());
}
final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType());
try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) {
keystore.load(in, service.getKeyStorePassword().toCharArray());
}
SSLContext sslContext = SSLContexts.custom()
.loadTrustMaterial(truststore, new TrustSelfSignedStrategy())
.loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray())
.build();
return sslContext;
}
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) { public void onTrigger(final ProcessContext context, final ProcessSession session) {
final boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean(); final boolean sendAsFlowFile = context.getProperty(SEND_AS_FLOWFILE).asBoolean();
final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger(); final int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
final HttpParams httpParams = new BasicHttpParams();
HttpConnectionParams.setConnectionTimeout(httpParams, context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
HttpConnectionParams.setSoTimeout(httpParams, context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
httpParams.setBooleanParameter(ClientPNames.HANDLE_REDIRECTS, false);
final String userAgent = context.getProperty(USER_AGENT).getValue(); final String userAgent = context.getProperty(USER_AGENT).getValue();
if (userAgent != null) {
httpParams.setParameter("http.useragent", userAgent);
}
final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
requestConfigBuilder.setRedirectsEnabled(false);
requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
final RequestConfig requestConfig = requestConfigBuilder.build();
final StreamThrottler throttler = throttlerRef.get(); final StreamThrottler throttler = throttlerRef.get();
final ProcessorLog logger = getLogger(); final ProcessorLog logger = getLogger();
@ -339,7 +386,7 @@ public class PostHTTP extends AbstractProcessor {
final List<FlowFile> toSend = new ArrayList<>(); final List<FlowFile> toSend = new ArrayList<>();
DestinationAccepts destinationAccepts = null; DestinationAccepts destinationAccepts = null;
HttpClient client = null; CloseableHttpClient client = null;
final String transactionId = UUID.randomUUID().toString(); final String transactionId = UUID.randomUUID().toString();
final ObjectHolder<String> dnHolder = new ObjectHolder<>("none"); final ObjectHolder<String> dnHolder = new ObjectHolder<>("none");
@ -371,15 +418,20 @@ public class PostHTTP extends AbstractProcessor {
if (client == null || destinationAccepts == null) { if (client == null || destinationAccepts == null) {
final Config config = getConfig(url, context); final Config config = getConfig(url, context);
final ClientConnectionManager conMan = config.getConnectionManager(); final HttpClientConnectionManager conMan = config.getConnectionManager();
client = new DefaultHttpClient(conMan, httpParams);
final HttpClientBuilder clientBuilder = HttpClientBuilder.create();
((AbstractHttpClient) client).addResponseInterceptor(new HttpResponseInterceptor() { clientBuilder.setConnectionManager(conMan);
clientBuilder.setUserAgent(userAgent);
clientBuilder.addInterceptorFirst(new HttpResponseInterceptor() {
@Override @Override
public void process(final HttpResponse response, final HttpContext context) throws HttpException, IOException { public void process(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
final HttpRoutedConnection httpRoutedConnection = (HttpRoutedConnection) context.getAttribute(ExecutionContext.HTTP_CONNECTION); HttpCoreContext coreContext = HttpCoreContext.adapt(httpContext);
if (httpRoutedConnection.isSecure()) { ManagedHttpClientConnection conn = coreContext.getConnection(ManagedHttpClientConnection.class);
final X509Certificate[] certChain = httpRoutedConnection.getSSLSession().getPeerCertificateChain(); SSLSession sslSession = conn.getSSLSession();
if ( sslSession != null ) {
final X509Certificate[] certChain = sslSession.getPeerCertificateChain();
if (certChain == null || certChain.length == 0) { if (certChain == null || certChain.length == 0) {
throw new SSLPeerUnverifiedException("No certificates found"); throw new SSLPeerUnverifiedException("No certificates found");
} }
@ -389,6 +441,23 @@ public class PostHTTP extends AbstractProcessor {
} }
} }
}); });
clientBuilder.disableAutomaticRetries();
clientBuilder.disableContentCompression();
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
// set the credentials if appropriate
if (username != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
if (password == null) {
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username));
} else {
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
};
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
client = clientBuilder.build();
// determine whether or not destination accepts flowfile/gzip // determine whether or not destination accepts flowfile/gzip
destinationAccepts = config.getDestinationAccepts(); destinationAccepts = config.getDestinationAccepts();
@ -491,7 +560,8 @@ public class PostHTTP extends AbstractProcessor {
entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean()); entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean());
post.setEntity(entity); post.setEntity(entity);
post.setConfig(requestConfig);
final String contentType; final String contentType;
if (sendAsFlowFile) { if (sendAsFlowFile) {
if (accepts.isFlowFileV3Accepted()) { if (accepts.isFlowFileV3Accepted()) {
@ -537,7 +607,7 @@ public class PostHTTP extends AbstractProcessor {
final String uploadDataRate; final String uploadDataRate;
final long uploadMillis; final long uploadMillis;
final HttpResponse response; CloseableHttpResponse response = null;
try { try {
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
response = client.execute(post); response = client.execute(post);
@ -556,6 +626,14 @@ public class PostHTTP extends AbstractProcessor {
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
} }
return; return;
} finally {
if ( response != null ) {
try {
response.close();
} catch (IOException e) {
getLogger().warn("Failed to close HTTP Response due to {}", new Object[] {e});
}
}
} }
// If we get a 'SEE OTHER' status code and an HTTP header that indicates that the intent // If we get a 'SEE OTHER' status code and an HTTP header that indicates that the intent
@ -686,39 +764,6 @@ public class PostHTTP extends AbstractProcessor {
} }
} }
private void registerUrlWithManager(final String url, final ProcessContext processContext, final ClientConnectionManager conMan) {
URI uriObject;
try {
uriObject = new URI(url);
} catch (URISyntaxException e) {
throw new ProcessException(e); // won't happen because of our
}
int port = uriObject.getPort();
if (port == -1) {
port = 443;
}
final boolean secure = (url.toLowerCase().startsWith("https"));
if (!secure) {
return;
}
final SSLContext sslContext = createSslContext(processContext);
final SSLSocketFactory sslSocketFactory = new SSLSocketFactory(sslContext);
final Scheme newScheme = new Scheme("https", port, sslSocketFactory);
conMan.getSchemeRegistry().register(newScheme);
}
/**
* Creates a SSL context based on the processor's optional properties.
* <p/>
*
* @return an SSLContext instance
*/
private SSLContext createSslContext(final ProcessContext context) {
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
return (sslContextService == null) ? null : sslContextService.createSSLContext(ClientAuth.REQUIRED);
}
private DestinationAccepts getDestinationAcceptance(final HttpClient client, final String uri, final ProcessorLog logger, final String transactionId) throws IOException { private DestinationAccepts getDestinationAcceptance(final HttpClient client, final String uri, final ProcessorLog logger, final String transactionId) throws IOException {
final HttpHead head = new HttpHead(uri); final HttpHead head = new HttpHead(uri);
@ -838,9 +883,9 @@ public class PostHTTP extends AbstractProcessor {
private static class Config { private static class Config {
private volatile DestinationAccepts destinationAccepts; private volatile DestinationAccepts destinationAccepts;
private final ClientConnectionManager conMan; private final HttpClientConnectionManager conMan;
public Config(final ClientConnectionManager conMan) { public Config(final HttpClientConnectionManager conMan) {
this.conMan = conMan; this.conMan = conMan;
} }
@ -852,7 +897,7 @@ public class PostHTTP extends AbstractProcessor {
this.destinationAccepts = destinationAccepts; this.destinationAccepts = destinationAccepts;
} }
public ClientConnectionManager getConnectionManager() { public HttpClientConnectionManager getConnectionManager() {
return conMan; return conMan;
} }
} }