NIFI-5282: GCPProcessor with HTTP Proxy with Auth

added http proxy support with authentication for GCP processors
added proxy support for Google Credential Service

This closes #2943.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
jugi92 2018-08-10 11:48:58 +02:00 committed by Koji Kawamura
parent 8f37b5ee10
commit cdae2b14b3
5 changed files with 83 additions and 6 deletions

View File

@ -28,7 +28,6 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import java.util.List;
/**
@ -59,7 +58,10 @@ public abstract class AbstractGCPProcessor<
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor
.Builder().name("gcp-proxy-host")
.displayName("Proxy host")
.description("IP or hostname of the proxy to be used")
.description("IP or hostname of the proxy to be used.\n " +
"You might need to set the following properties in bootstrap for https proxy usage:\n" +
"-Djdk.http.auth.tunneling.disabledSchemes=\n" +
"-Djdk.http.auth.proxying.disabledSchemes=")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -74,6 +76,22 @@ public abstract class AbstractGCPProcessor<
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder()
.name("Http Proxy Username")
.description("Http Proxy Username")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.build();
public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
.name("Http Proxy Password")
.description("Http Proxy Password")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.sensitive(true)
.build();
/**
* Links to the {@link GCPCredentialsService} which provides credentials for this particular processor.
@ -100,7 +118,9 @@ public abstract class AbstractGCPProcessor<
PROJECT_ID,
RETRY_COUNT,
PROXY_HOST,
PROXY_PORT
PROXY_PORT,
HTTP_PROXY_USERNAME,
HTTP_PROXY_PASSWORD
);
}

View File

@ -88,4 +88,22 @@ public final class CredentialPropertyDescriptors {
.description("The raw JSON containing a Service Account keyfile.")
.sensitive(true)
.build();
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor
.Builder().name("gcp-proxy-host")
.displayName("Proxy host")
.description("IP or hostname of the proxy to be used")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor
.Builder().name("gcp-proxy-port")
.displayName("Proxy port")
.description("Proxy port number")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
}

View File

@ -18,9 +18,16 @@ package org.apache.nifi.processors.gcp.credentials.factory.strategies;
import com.google.auth.oauth2.GoogleCredentials;
import org.apache.nifi.components.PropertyDescriptor;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.auth.http.HttpTransportFactory;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.util.Map;
/**
@ -36,7 +43,24 @@ public abstract class AbstractServiceAccountCredentialsStrategy extends Abstract
@Override
public GoogleCredentials getGoogleCredentials(Map<PropertyDescriptor, String> properties) throws IOException {
return GoogleCredentials.fromStream(getServiceAccountJson(properties));
final String proxyHost = properties.get(CredentialPropertyDescriptors.PROXY_HOST);
final String proxyPortString = properties.get(CredentialPropertyDescriptors.PROXY_PORT);
final Integer proxyPort = (proxyPortString != null && proxyPortString.matches("-?\\d+")) ?
Integer.parseInt(proxyPortString) : 0;
if (!StringUtils.isBlank(proxyHost) && proxyPort > 0) {
return GoogleCredentials.fromStream(getServiceAccountJson(properties),
new HttpTransportFactory() {
@Override
public HttpTransport create() {
return new NetHttpTransport.Builder()
.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)))
.build();
}
});
} else {
return GoogleCredentials.fromStream(getServiceAccountJson(properties));
}
}
}

View File

@ -40,6 +40,8 @@ import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPrope
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.USE_APPLICATION_DEFAULT_CREDENTIALS;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.USE_COMPUTE_ENGINE_CREDENTIALS;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.PROXY_HOST;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.PROXY_PORT;
/**
* Implementation of GCPCredentialsService interface
*
@ -61,6 +63,8 @@ public class GCPCredentialsControllerService extends AbstractControllerService i
props.add(USE_COMPUTE_ENGINE_CREDENTIALS);
props.add(SERVICE_ACCOUNT_JSON_FILE);
props.add(SERVICE_ACCOUNT_JSON);
props.add(PROXY_HOST);
props.add(PROXY_PORT);
properties = Collections.unmodifiableList(props);
}

View File

@ -31,7 +31,9 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.util.StringUtils;
import java.net.Authenticator;
import java.net.InetSocketAddress;
import java.net.PasswordAuthentication;
import java.net.Proxy;
import java.util.Arrays;
import java.util.Collections;
@ -76,6 +78,8 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
final String proxyHost = context.getProperty(PROXY_HOST).getValue();
final Integer proxyPort = context.getProperty(PROXY_PORT).asInteger();
final String proxyUser = context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue();
final String proxyPassword = context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder()
.setCredentials(credentials)
@ -88,6 +92,15 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
storageOptionsBuilder.setTransportOptions(HttpTransportOptions.newBuilder().setHttpTransportFactory(new HttpTransportFactory() {
@Override
public HttpTransport create() {
if (!StringUtils.isBlank(proxyUser) && !StringUtils.isBlank(proxyPassword)) {
Authenticator authenticator = new Authenticator() {
public PasswordAuthentication getPasswordAuthentication() {
return (new PasswordAuthentication(proxyUser,
proxyPassword.toCharArray()));
}
};
Authenticator.setDefault(authenticator);
}
return new NetHttpTransport.Builder()
.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)))
.build();
@ -96,6 +109,4 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
}
return storageOptionsBuilder.build();
}
}