mirror of https://github.com/apache/nifi.git
NIFI-2162 Updating OkHttp to 3.8.1 and OkHttp-Digest to 1.13 and refactoring InvokeHttp to adjust for changes
This closes #2004 Signed-off-by: Tony Kurc <tkurc@apache.org>
This commit is contained in:
parent
e203358bf3
commit
9367c28064
|
@ -196,14 +196,14 @@
|
|||
<artifactId>jackson-mapper-asl</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.squareup.okhttp</groupId>
|
||||
<groupId>com.squareup.okhttp3</groupId>
|
||||
<artifactId>okhttp</artifactId>
|
||||
<version>2.7.1</version>
|
||||
<version>3.8.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.burgstaller</groupId>
|
||||
<artifactId>okhttp-digest</artifactId>
|
||||
<version>0.6</version>
|
||||
<version>1.13</version>
|
||||
<type>jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
|
|
@ -16,48 +16,18 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Proxy;
|
||||
import java.net.Proxy.Type;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import javax.net.ssl.HostnameVerifier;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLSession;
|
||||
|
||||
import com.burgstaller.okhttp.AuthenticationCacheInterceptor;
|
||||
import com.burgstaller.okhttp.CachingAuthenticatorDecorator;
|
||||
import com.burgstaller.okhttp.digest.CachingAuthenticator;
|
||||
import com.burgstaller.okhttp.digest.DigestAuthenticator;
|
||||
import com.squareup.okhttp.MediaType;
|
||||
import com.squareup.okhttp.OkHttpClient;
|
||||
import com.squareup.okhttp.Request;
|
||||
import com.squareup.okhttp.RequestBody;
|
||||
import com.squareup.okhttp.Response;
|
||||
import com.squareup.okhttp.ResponseBody;
|
||||
|
||||
import okhttp3.Credentials;
|
||||
import okhttp3.MediaType;
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.Request;
|
||||
import okhttp3.RequestBody;
|
||||
import okhttp3.Response;
|
||||
import okhttp3.ResponseBody;
|
||||
import okhttp3.internal.tls.OkHostnameVerifier;
|
||||
import okio.BufferedSink;
|
||||
import org.apache.commons.io.input.TeeInputStream;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
@ -83,7 +53,7 @@ import org.apache.nifi.processor.ProcessSession;
|
|||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.standard.util.MultiAuthenticator;
|
||||
import org.apache.nifi.processors.standard.util.ProxyAuthenticator;
|
||||
import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
|
||||
|
@ -91,6 +61,48 @@ import org.apache.nifi.stream.io.StreamUtils;
|
|||
import org.joda.time.format.DateTimeFormat;
|
||||
import org.joda.time.format.DateTimeFormatter;
|
||||
|
||||
import javax.net.ssl.HostnameVerifier;
|
||||
import javax.net.ssl.KeyManagerFactory;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLSession;
|
||||
import javax.net.ssl.SSLSocketFactory;
|
||||
import javax.net.ssl.TrustManager;
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
import javax.net.ssl.X509TrustManager;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Proxy;
|
||||
import java.net.Proxy.Type;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.KeyStore;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.UnrecoverableKeyException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
|
||||
|
||||
@SupportsBatching
|
||||
@Tags({"http", "https", "rest", "client"})
|
||||
@InputRequirement(Requirement.INPUT_ALLOWED)
|
||||
|
@ -500,48 +512,102 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
}
|
||||
|
||||
@OnScheduled
|
||||
public void setUpClient(final ProcessContext context) throws IOException {
|
||||
public void setUpClient(final ProcessContext context) throws IOException, UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
|
||||
okHttpClientAtomicReference.set(null);
|
||||
|
||||
OkHttpClient okHttpClient = new OkHttpClient();
|
||||
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder();
|
||||
|
||||
// Add a proxy if set
|
||||
final String proxyHost = context.getProperty(PROP_PROXY_HOST).getValue();
|
||||
final Integer proxyPort = context.getProperty(PROP_PROXY_PORT).asInteger();
|
||||
if (proxyHost != null && proxyPort != null) {
|
||||
final Proxy proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
|
||||
okHttpClient.setProxy(proxy);
|
||||
okHttpClientBuilder.proxy(proxy);
|
||||
}
|
||||
|
||||
// Set timeouts
|
||||
okHttpClient.setConnectTimeout((context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS);
|
||||
okHttpClient.setReadTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);
|
||||
okHttpClientBuilder.connectTimeout((context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS);
|
||||
okHttpClientBuilder.readTimeout(context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);
|
||||
|
||||
// Set whether to follow redirects
|
||||
okHttpClient.setFollowRedirects(context.getProperty(PROP_FOLLOW_REDIRECTS).asBoolean());
|
||||
okHttpClientBuilder.followRedirects(context.getProperty(PROP_FOLLOW_REDIRECTS).asBoolean());
|
||||
|
||||
final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(ClientAuth.NONE);
|
||||
|
||||
// check if the ssl context is set and add the factory if so
|
||||
if (sslContext != null) {
|
||||
okHttpClient.setSslSocketFactory(sslContext.getSocketFactory());
|
||||
setSslSocketFactory(okHttpClientBuilder, sslService, sslContext);
|
||||
}
|
||||
|
||||
// check the trusted hostname property and override the HostnameVerifier
|
||||
String trustedHostname = trimToEmpty(context.getProperty(PROP_TRUSTED_HOSTNAME).getValue());
|
||||
if (!trustedHostname.isEmpty()) {
|
||||
okHttpClient.setHostnameVerifier(new OverrideHostnameVerifier(trustedHostname, okHttpClient.getHostnameVerifier()));
|
||||
okHttpClientBuilder.hostnameVerifier(new OverrideHostnameVerifier(trustedHostname, OkHostnameVerifier.INSTANCE));
|
||||
}
|
||||
|
||||
setAuthenticator(okHttpClient, context);
|
||||
setAuthenticator(okHttpClientBuilder, context);
|
||||
|
||||
useChunked = context.getProperty(PROP_USE_CHUNKED_ENCODING).asBoolean();
|
||||
|
||||
okHttpClientAtomicReference.set(okHttpClient);
|
||||
okHttpClientAtomicReference.set(okHttpClientBuilder.build());
|
||||
}
|
||||
|
||||
private void setAuthenticator(OkHttpClient okHttpClient, ProcessContext context) {
|
||||
/*
|
||||
Overall, this method is based off of examples from OkHttp3 documentation:
|
||||
https://square.github.io/okhttp/3.x/okhttp/okhttp3/OkHttpClient.Builder.html#sslSocketFactory-javax.net.ssl.SSLSocketFactory-javax.net.ssl.X509TrustManager-
|
||||
https://github.com/square/okhttp/blob/master/samples/guide/src/main/java/okhttp3/recipes/CustomTrust.java#L156
|
||||
|
||||
In-depth documentation on Java Secure Socket Extension (JSSE) Classes and interfaces:
|
||||
https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#JSSEClasses
|
||||
*/
|
||||
private void setSslSocketFactory(OkHttpClient.Builder okHttpClientBuilder, SSLContextService sslService, SSLContext sslContext)
|
||||
throws IOException, KeyStoreException, CertificateException, NoSuchAlgorithmException, UnrecoverableKeyException, KeyManagementException {
|
||||
final String keystoreLocation = sslService.getKeyStoreFile();
|
||||
final String keystorePass = sslService.getKeyStorePassword();
|
||||
final String keystoreType = sslService.getKeyStoreType();
|
||||
|
||||
// prepare the keystore
|
||||
final KeyStore keyStore = KeyStore.getInstance(keystoreType);
|
||||
|
||||
try (FileInputStream keyStoreStream = new FileInputStream(keystoreLocation)) {
|
||||
keyStore.load(keyStoreStream, keystorePass.toCharArray());
|
||||
}
|
||||
|
||||
final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
|
||||
keyManagerFactory.init(keyStore, keystorePass.toCharArray());
|
||||
|
||||
// load truststore
|
||||
final String truststoreLocation = sslService.getTrustStoreFile();
|
||||
final String truststorePass = sslService.getTrustStorePassword();
|
||||
final String truststoreType = sslService.getTrustStoreType();
|
||||
|
||||
KeyStore truststore = KeyStore.getInstance(truststoreType);
|
||||
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
|
||||
truststore.load(new FileInputStream(truststoreLocation), truststorePass.toCharArray());
|
||||
trustManagerFactory.init(truststore);
|
||||
|
||||
/*
|
||||
TrustManagerFactory.getTrustManagers returns a trust manager for each type of trust material. Since we are getting a trust manager factory that uses "X509"
|
||||
as it's trust management algorithm, we are able to grab the first (and thus the most preferred) and use it as our x509 Trust Manager
|
||||
|
||||
https://docs.oracle.com/javase/8/docs/api/javax/net/ssl/TrustManagerFactory.html#getTrustManagers--
|
||||
*/
|
||||
final X509TrustManager x509TrustManager;
|
||||
TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
|
||||
if (trustManagers[0] != null) {
|
||||
x509TrustManager = (X509TrustManager) trustManagers[0];
|
||||
} else {
|
||||
throw new IllegalStateException("List of trust managers is null");
|
||||
}
|
||||
|
||||
sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
|
||||
|
||||
final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
|
||||
okHttpClientBuilder.sslSocketFactory(sslSocketFactory, x509TrustManager);
|
||||
}
|
||||
|
||||
private void setAuthenticator(OkHttpClient.Builder okHttpClientBuilder, ProcessContext context) {
|
||||
final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
|
||||
final String proxyUsername = trimToEmpty(context.getProperty(PROP_PROXY_USER).getValue());
|
||||
|
||||
|
@ -550,34 +616,30 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
|
||||
|
||||
/*
|
||||
* Currently OkHttp doesn't have built-in Digest Auth Support. The ticket for adding it is here:
|
||||
* https://github.com/square/okhttp/issues/205#issuecomment-154047052
|
||||
* Once added this should be refactored to use the built in support. For now, a third party lib is needed.
|
||||
* OkHttp doesn't have built-in Digest Auth Support. A ticket for adding it is here[1] but they authors decided instead to rely on a 3rd party lib.
|
||||
*
|
||||
* [1] https://github.com/square/okhttp/issues/205#issuecomment-154047052
|
||||
*/
|
||||
final Map<String, CachingAuthenticator> authCache = new ConcurrentHashMap<>();
|
||||
com.burgstaller.okhttp.digest.Credentials credentials = new com.burgstaller.okhttp.digest.Credentials(authUser, authPass);
|
||||
final DigestAuthenticator digestAuthenticator = new DigestAuthenticator(credentials);
|
||||
|
||||
MultiAuthenticator authenticator = new MultiAuthenticator.Builder()
|
||||
.with("Digest", digestAuthenticator)
|
||||
.build();
|
||||
|
||||
if(!proxyUsername.isEmpty()) {
|
||||
final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).getValue();
|
||||
authenticator.setProxyUsername(proxyUsername);
|
||||
authenticator.setProxyPassword(proxyPassword);
|
||||
ProxyAuthenticator proxyAuthenticator = new ProxyAuthenticator(proxyUsername, proxyPassword);
|
||||
|
||||
okHttpClientBuilder.proxyAuthenticator(proxyAuthenticator);
|
||||
}
|
||||
|
||||
okHttpClient.interceptors().add(new AuthenticationCacheInterceptor(authCache));
|
||||
okHttpClient.setAuthenticator(new CachingAuthenticatorDecorator(authenticator, authCache));
|
||||
okHttpClientBuilder.interceptors().add(new AuthenticationCacheInterceptor(authCache));
|
||||
okHttpClientBuilder.authenticator(new CachingAuthenticatorDecorator(digestAuthenticator, authCache));
|
||||
} else {
|
||||
// Add proxy authentication only
|
||||
if(!proxyUsername.isEmpty()) {
|
||||
final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).getValue();
|
||||
MultiAuthenticator authenticator = new MultiAuthenticator.Builder().build();
|
||||
authenticator.setProxyUsername(proxyUsername);
|
||||
authenticator.setProxyPassword(proxyPassword);
|
||||
okHttpClient.setAuthenticator(authenticator);
|
||||
ProxyAuthenticator proxyAuthenticator = new ProxyAuthenticator(proxyUsername, proxyPassword);
|
||||
|
||||
okHttpClientBuilder.proxyAuthenticator(proxyAuthenticator);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -791,7 +853,7 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
if (!authUser.isEmpty() && "false".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) {
|
||||
final String authPass = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_PASSWORD).getValue());
|
||||
|
||||
String credential = com.squareup.okhttp.Credentials.basic(authUser, authPass);
|
||||
String credential = Credentials.basic(authUser, authPass);
|
||||
requestBuilder = requestBuilder.header("Authorization", credential);
|
||||
}
|
||||
|
||||
|
@ -940,7 +1002,7 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
|
||||
private void logRequest(ComponentLog logger, Request request) {
|
||||
logger.debug("\nRequest to remote service:\n\t{}\n{}",
|
||||
new Object[]{request.url().toExternalForm(), getLogString(request.headers().toMultimap())});
|
||||
new Object[]{request.url().url().toExternalForm(), getLogString(request.headers().toMultimap())});
|
||||
}
|
||||
|
||||
private void logResponse(ComponentLog logger, URL url, Response response) {
|
||||
|
@ -1001,25 +1063,24 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
private Map<String, String> convertAttributesFromHeaders(URL url, Response responseHttp){
|
||||
// create a new hashmap to store the values from the connection
|
||||
Map<String, String> map = new HashMap<>();
|
||||
for (Map.Entry<String, List<String>> entry : responseHttp.headers().toMultimap().entrySet()) {
|
||||
String key = entry.getKey();
|
||||
if (key == null) {
|
||||
continue;
|
||||
}
|
||||
responseHttp.headers().names().forEach( (key) -> {
|
||||
if (key == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<String> values = entry.getValue();
|
||||
List<String> values = responseHttp.headers().values(key);
|
||||
|
||||
// we ignore any headers with no actual values (rare)
|
||||
if (values == null || values.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
// we ignore any headers with no actual values (rare)
|
||||
if (values == null || values.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// create a comma separated string from the values, this is stored in the map
|
||||
String value = csv(values);
|
||||
// create a comma separated string from the values, this is stored in the map
|
||||
String value = csv(values);
|
||||
|
||||
// put the csv into the map
|
||||
map.put(key, value);
|
||||
}
|
||||
// put the csv into the map
|
||||
map.put(key, value);
|
||||
});
|
||||
|
||||
if ("HTTPS".equals(url.getProtocol().toUpperCase())) {
|
||||
map.put(REMOTE_DN, responseHttp.handshake().peerPrincipal().getName());
|
||||
|
|
|
@ -16,35 +16,28 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import okhttp3.Authenticator;
|
||||
import okhttp3.Credentials;
|
||||
import okhttp3.Request;
|
||||
import okhttp3.Response;
|
||||
import okhttp3.Route;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.net.Proxy;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import com.burgstaller.okhttp.DispatchingAuthenticator;
|
||||
import com.squareup.okhttp.Authenticator;
|
||||
import com.squareup.okhttp.Credentials;
|
||||
import com.squareup.okhttp.Request;
|
||||
import com.squareup.okhttp.Response;
|
||||
public class ProxyAuthenticator implements Authenticator {
|
||||
|
||||
public class MultiAuthenticator extends DispatchingAuthenticator {
|
||||
public ProxyAuthenticator() {
|
||||
}
|
||||
|
||||
public MultiAuthenticator(Map<String, Authenticator> registry) {
|
||||
super(registry);
|
||||
public ProxyAuthenticator(String proxyUsername, String proxyPassword) {
|
||||
this.proxyUsername = proxyUsername;
|
||||
this.proxyPassword = proxyPassword;
|
||||
}
|
||||
|
||||
private String proxyUsername;
|
||||
private String proxyPassword;
|
||||
|
||||
@Override
|
||||
public Request authenticateProxy(Proxy proxy, Response response) throws IOException {
|
||||
String credential = Credentials.basic(proxyUsername, proxyPassword);
|
||||
return response.request()
|
||||
.newBuilder()
|
||||
.header("Proxy-Authorization", credential)
|
||||
.build();
|
||||
}
|
||||
|
||||
public void setProxyUsername(String proxyUsername) {
|
||||
this.proxyUsername = proxyUsername;
|
||||
}
|
||||
|
@ -53,17 +46,13 @@ public class MultiAuthenticator extends DispatchingAuthenticator {
|
|||
this.proxyPassword = proxyPassword;
|
||||
}
|
||||
|
||||
public static final class Builder {
|
||||
Map<String, Authenticator> registry = new HashMap<>();
|
||||
|
||||
public Builder with(String scheme, Authenticator authenticator) {
|
||||
registry.put(scheme, authenticator);
|
||||
return this;
|
||||
}
|
||||
|
||||
public MultiAuthenticator build() {
|
||||
return new MultiAuthenticator(registry);
|
||||
}
|
||||
@Nullable
|
||||
@Override
|
||||
public Request authenticate(Route route, Response response) throws IOException {
|
||||
String credential = Credentials.basic(proxyUsername, proxyPassword);
|
||||
return response.request()
|
||||
.newBuilder()
|
||||
.header("Proxy-Authorization", credential)
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue