mirror of https://github.com/apache/nifi.git
NIFI-1751 Added proxy authentication in InvokeHttp processor
This closes #398. Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
parent
c360b57a1e
commit
ef192cc859
|
@ -49,17 +49,15 @@ import javax.net.ssl.SSLSession;
|
|||
|
||||
import com.burgstaller.okhttp.AuthenticationCacheInterceptor;
|
||||
import com.burgstaller.okhttp.CachingAuthenticatorDecorator;
|
||||
import com.burgstaller.okhttp.DispatchingAuthenticator;
|
||||
import com.burgstaller.okhttp.digest.CachingAuthenticator;
|
||||
import com.burgstaller.okhttp.digest.DigestAuthenticator;
|
||||
|
||||
import com.squareup.okhttp.MediaType;
|
||||
import com.squareup.okhttp.OkHttpClient;
|
||||
import com.squareup.okhttp.Request;
|
||||
import com.squareup.okhttp.RequestBody;
|
||||
import com.squareup.okhttp.Response;
|
||||
|
||||
import com.squareup.okhttp.ResponseBody;
|
||||
|
||||
import okio.BufferedSink;
|
||||
import org.apache.commons.io.input.TeeInputStream;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
@ -85,6 +83,7 @@ import org.apache.nifi.processor.ProcessSession;
|
|||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.standard.util.MultiAuthenticator;
|
||||
import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
|
||||
|
@ -214,6 +213,23 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
.addValidator(StandardValidators.PORT_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_PROXY_USER = new PropertyDescriptor.Builder()
|
||||
.name("invokehttp-proxy-user")
|
||||
.displayName("Proxy Username")
|
||||
.description("Username to set when authenticating against proxy")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("invokehttp-proxy-password")
|
||||
.displayName("Proxy Password")
|
||||
.description("Password to set when authenticating against proxy")
|
||||
.required(false)
|
||||
.sensitive(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_CONTENT_TYPE = new PropertyDescriptor.Builder()
|
||||
.name("Content-Type")
|
||||
.description("The Content-Type to specify for when content is being transmitted through a PUT or POST. "
|
||||
|
@ -349,6 +365,8 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
PROP_BASIC_AUTH_PASSWORD,
|
||||
PROP_PROXY_HOST,
|
||||
PROP_PROXY_PORT,
|
||||
PROP_PROXY_USER,
|
||||
PROP_PROXY_PASSWORD,
|
||||
PROP_PUT_OUTPUT_IN_ATTRIBUTE,
|
||||
PROP_PUT_ATTRIBUTE_MAX_LENGTH,
|
||||
PROP_DIGEST_AUTH,
|
||||
|
@ -454,7 +472,7 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
final List<ValidationResult> results = new ArrayList<>(1);
|
||||
final List<ValidationResult> results = new ArrayList<>(3);
|
||||
final boolean proxyHostSet = validationContext.getProperty(PROP_PROXY_HOST).isSet();
|
||||
final boolean proxyPortSet = validationContext.getProperty(PROP_PROXY_PORT).isSet();
|
||||
|
||||
|
@ -462,6 +480,16 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
results.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build());
|
||||
}
|
||||
|
||||
final boolean proxyUserSet = validationContext.getProperty(PROP_PROXY_USER).isSet();
|
||||
final boolean proxyPwdSet = validationContext.getProperty(PROP_PROXY_PASSWORD).isSet();
|
||||
|
||||
if ((proxyUserSet && !proxyPwdSet) || (!proxyUserSet && proxyPwdSet)) {
|
||||
results.add(new ValidationResult.Builder().subject("Proxy User and Password").valid(false).explanation("If Proxy Username or Proxy Password is set, both must be set").build());
|
||||
}
|
||||
if(proxyUserSet && !proxyHostSet) {
|
||||
results.add(new ValidationResult.Builder().subject("Proxy").valid(false).explanation("If Proxy username is set, proxy host must be set").build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
|
@ -500,7 +528,16 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
okHttpClient.setHostnameVerifier(new OverrideHostnameVerifier(trustedHostname, okHttpClient.getHostnameVerifier()));
|
||||
}
|
||||
|
||||
setAuthenticator(okHttpClient, context);
|
||||
|
||||
useChunked = context.getProperty(PROP_USE_CHUNKED_ENCODING).asBoolean();
|
||||
|
||||
okHttpClientAtomicReference.set(okHttpClient);
|
||||
}
|
||||
|
||||
private void setAuthenticator(OkHttpClient okHttpClient, ProcessContext context) {
|
||||
final String authUser = trimToEmpty(context.getProperty(PROP_BASIC_AUTH_USERNAME).getValue());
|
||||
final String proxyUsername = trimToEmpty(context.getProperty(PROP_PROXY_USER).getValue());
|
||||
|
||||
// If the username/password properties are set then check if digest auth is being used
|
||||
if (!authUser.isEmpty() && "true".equalsIgnoreCase(context.getProperty(PROP_DIGEST_AUTH).getValue())) {
|
||||
|
@ -512,22 +549,31 @@ public final class InvokeHTTP extends AbstractProcessor {
|
|||
* Once added this should be refactored to use the built in support. For now, a third party lib is needed.
|
||||
*/
|
||||
final Map<String, CachingAuthenticator> authCache = new ConcurrentHashMap<>();
|
||||
|
||||
com.burgstaller.okhttp.digest.Credentials credentials = new com.burgstaller.okhttp.digest.Credentials(authUser, authPass);
|
||||
|
||||
final DigestAuthenticator digestAuthenticator = new DigestAuthenticator(credentials);
|
||||
|
||||
DispatchingAuthenticator authenticator = new DispatchingAuthenticator.Builder()
|
||||
MultiAuthenticator authenticator = new MultiAuthenticator.Builder()
|
||||
.with("Digest", digestAuthenticator)
|
||||
.build();
|
||||
|
||||
if(!proxyUsername.isEmpty()) {
|
||||
final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).getValue();
|
||||
authenticator.setProxyUsername(proxyUsername);
|
||||
authenticator.setProxyPassword(proxyPassword);
|
||||
}
|
||||
|
||||
okHttpClient.interceptors().add(new AuthenticationCacheInterceptor(authCache));
|
||||
okHttpClient.setAuthenticator(new CachingAuthenticatorDecorator(authenticator, authCache));
|
||||
} else {
|
||||
// Add proxy authentication only
|
||||
if(!proxyUsername.isEmpty()) {
|
||||
final String proxyPassword = context.getProperty(PROP_PROXY_PASSWORD).getValue();
|
||||
MultiAuthenticator authenticator = new MultiAuthenticator.Builder().build();
|
||||
authenticator.setProxyUsername(proxyUsername);
|
||||
authenticator.setProxyPassword(proxyPassword);
|
||||
okHttpClient.setAuthenticator(authenticator);
|
||||
}
|
||||
}
|
||||
|
||||
useChunked = context.getProperty(PROP_USE_CHUNKED_ENCODING).asBoolean();
|
||||
|
||||
okHttpClientAtomicReference.set(okHttpClient);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.Proxy;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import com.burgstaller.okhttp.DispatchingAuthenticator;
|
||||
import com.squareup.okhttp.Authenticator;
|
||||
import com.squareup.okhttp.Credentials;
|
||||
import com.squareup.okhttp.Request;
|
||||
import com.squareup.okhttp.Response;
|
||||
|
||||
public class MultiAuthenticator extends DispatchingAuthenticator {
|
||||
|
||||
public MultiAuthenticator(Map<String, Authenticator> registry) {
|
||||
super(registry);
|
||||
}
|
||||
|
||||
private String proxyUsername;
|
||||
private String proxyPassword;
|
||||
|
||||
@Override
|
||||
public Request authenticateProxy(Proxy proxy, Response response) throws IOException {
|
||||
String credential = Credentials.basic(proxyUsername, proxyPassword);
|
||||
return response.request()
|
||||
.newBuilder()
|
||||
.header("Proxy-Authorization", credential)
|
||||
.build();
|
||||
}
|
||||
|
||||
public void setProxyUsername(String proxyUsername) {
|
||||
this.proxyUsername = proxyUsername;
|
||||
}
|
||||
|
||||
public void setProxyPassword(String proxyPassword) {
|
||||
this.proxyPassword = proxyPassword;
|
||||
}
|
||||
|
||||
public static final class Builder {
|
||||
Map<String, Authenticator> registry = new HashMap<>();
|
||||
|
||||
public Builder with(String scheme, Authenticator authenticator) {
|
||||
registry.put(scheme, authenticator);
|
||||
return this;
|
||||
}
|
||||
|
||||
public MultiAuthenticator build() {
|
||||
return new MultiAuthenticator(registry);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -144,6 +144,16 @@ public class TestInvokeHTTP extends TestInvokeHttpCommon {
|
|||
}
|
||||
runner.setProperty(InvokeHTTP.PROP_PROXY_PORT, String.valueOf(proxyURL.getPort()));
|
||||
|
||||
runner.setProperty(InvokeHTTP.PROP_PROXY_USER, "username");
|
||||
|
||||
try{
|
||||
runner.run();
|
||||
Assert.fail();
|
||||
} catch (AssertionError e){
|
||||
// Expect assetion error when proxy password isn't set but host is.
|
||||
}
|
||||
runner.setProperty(InvokeHTTP.PROP_PROXY_PASSWORD, "password");
|
||||
|
||||
createFlowFiles(runner);
|
||||
|
||||
runner.run();
|
||||
|
|
Loading…
Reference in New Issue