mirror of https://github.com/apache/nifi.git
NIFI-5282: Add ProxyConfigurationService to GCSProcessors
This closes #2981
This commit is contained in:
parent
cdae2b14b3
commit
5a58c9a171
|
@ -34,6 +34,10 @@
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-api</artifactId>
|
<artifactId>nifi-api</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-proxy-configuration-api</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-processor-utils</artifactId>
|
<artifactId>nifi-processor-utils</artifactId>
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.nifi.processor.AbstractProcessor;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
|
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
|
||||||
|
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -76,16 +77,18 @@ public abstract class AbstractGCPProcessor<
|
||||||
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor
|
||||||
.name("Http Proxy Username")
|
.Builder().name("gcp-proxy-user-name")
|
||||||
|
.displayName("Http Proxy Username")
|
||||||
.description("Http Proxy Username")
|
.description("Http Proxy Username")
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
.required(false)
|
.required(false)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor
|
||||||
.name("Http Proxy Password")
|
.Builder().name("gcp-proxy-user-password")
|
||||||
|
.displayName("Http Proxy Password")
|
||||||
.description("Http Proxy Password")
|
.description("Http Proxy Password")
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
@ -120,7 +123,8 @@ public abstract class AbstractGCPProcessor<
|
||||||
PROXY_HOST,
|
PROXY_HOST,
|
||||||
PROXY_PORT,
|
PROXY_PORT,
|
||||||
HTTP_PROXY_USERNAME,
|
HTTP_PROXY_USERNAME,
|
||||||
HTTP_PROXY_PASSWORD
|
HTTP_PROXY_PASSWORD,
|
||||||
|
ProxyConfiguration.createProxyConfigPropertyDescriptor(true, ProxyAwareTransportFactory.PROXY_SPECS)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.gcp;
|
||||||
|
|
||||||
|
import com.google.api.client.http.HttpTransport;
|
||||||
|
import com.google.api.client.http.apache.ApacheHttpTransport;
|
||||||
|
import com.google.api.client.http.javanet.NetHttpTransport;
|
||||||
|
import com.google.auth.http.HttpTransportFactory;
|
||||||
|
import org.apache.http.HttpHost;
|
||||||
|
import org.apache.http.auth.AuthScope;
|
||||||
|
import org.apache.http.auth.UsernamePasswordCredentials;
|
||||||
|
import org.apache.http.conn.params.ConnRouteParams;
|
||||||
|
import org.apache.http.impl.client.BasicCredentialsProvider;
|
||||||
|
import org.apache.http.impl.client.DefaultHttpClient;
|
||||||
|
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||||
|
import org.apache.nifi.proxy.ProxySpec;
|
||||||
|
|
||||||
|
import java.net.Proxy;
|
||||||
|
|
||||||
|
public class ProxyAwareTransportFactory implements HttpTransportFactory {
|
||||||
|
|
||||||
|
private static final HttpTransport DEFAULT_TRANSPORT = new NetHttpTransport();
|
||||||
|
public static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
|
||||||
|
|
||||||
|
private final ProxyConfiguration proxyConfig;
|
||||||
|
|
||||||
|
public ProxyAwareTransportFactory(ProxyConfiguration proxyConfig) {
|
||||||
|
this.proxyConfig = proxyConfig;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HttpTransport create() {
|
||||||
|
|
||||||
|
if (proxyConfig == null) {
|
||||||
|
return DEFAULT_TRANSPORT;
|
||||||
|
}
|
||||||
|
|
||||||
|
final Proxy proxy = proxyConfig.createProxy();
|
||||||
|
|
||||||
|
if (Proxy.Type.HTTP.equals(proxy.type()) && proxyConfig.hasCredential()) {
|
||||||
|
// If it requires authentication via username and password, use ApacheHttpTransport
|
||||||
|
final String host = proxyConfig.getProxyServerHost();
|
||||||
|
final int port = proxyConfig.getProxyServerPort();
|
||||||
|
final HttpHost proxyHost = new HttpHost(host, port);
|
||||||
|
|
||||||
|
final DefaultHttpClient httpClient = new DefaultHttpClient();
|
||||||
|
ConnRouteParams.setDefaultProxy(httpClient.getParams(), proxyHost);
|
||||||
|
|
||||||
|
if (proxyConfig.hasCredential()) {
|
||||||
|
final AuthScope proxyAuthScope = new AuthScope(host, port);
|
||||||
|
final UsernamePasswordCredentials proxyCredential
|
||||||
|
= new UsernamePasswordCredentials(proxyConfig.getProxyUserName(), proxyConfig.getProxyUserPassword());
|
||||||
|
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
||||||
|
credentialsProvider.setCredentials(proxyAuthScope, proxyCredential);
|
||||||
|
httpClient.setCredentialsProvider(credentialsProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new ApacheHttpTransport(httpClient);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return new NetHttpTransport.Builder().setProxy(proxy).build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -89,21 +89,4 @@ public final class CredentialPropertyDescriptors {
|
||||||
.sensitive(true)
|
.sensitive(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor
|
|
||||||
.Builder().name("gcp-proxy-host")
|
|
||||||
.displayName("Proxy host")
|
|
||||||
.description("IP or hostname of the proxy to be used")
|
|
||||||
.required(false)
|
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor
|
|
||||||
.Builder().name("gcp-proxy-port")
|
|
||||||
.displayName("Proxy port")
|
|
||||||
.description("Proxy port number")
|
|
||||||
.required(false)
|
|
||||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
|
||||||
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.gcp.credentials.factory;
|
package org.apache.nifi.processors.gcp.credentials.factory;
|
||||||
|
|
||||||
|
import com.google.auth.http.HttpTransportFactory;
|
||||||
import com.google.auth.oauth2.GoogleCredentials;
|
import com.google.auth.oauth2.GoogleCredentials;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.ValidationContext;
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
@ -97,8 +98,8 @@ public class CredentialsFactory {
|
||||||
*
|
*
|
||||||
* @throws IOException if there is an issue accessing the credential files
|
* @throws IOException if there is an issue accessing the credential files
|
||||||
*/
|
*/
|
||||||
public GoogleCredentials getGoogleCredentials(final Map<PropertyDescriptor, String> properties) throws IOException {
|
public GoogleCredentials getGoogleCredentials(final Map<PropertyDescriptor, String> properties, final HttpTransportFactory transportFactory) throws IOException {
|
||||||
final CredentialsStrategy primaryStrategy = selectPrimaryStrategy(properties);
|
final CredentialsStrategy primaryStrategy = selectPrimaryStrategy(properties);
|
||||||
return primaryStrategy.getGoogleCredentials(properties);
|
return primaryStrategy.getGoogleCredentials(properties, transportFactory);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.gcp.credentials.factory;
|
package org.apache.nifi.processors.gcp.credentials.factory;
|
||||||
|
|
||||||
|
import com.google.auth.http.HttpTransportFactory;
|
||||||
import com.google.auth.oauth2.GoogleCredentials;
|
import com.google.auth.oauth2.GoogleCredentials;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.ValidationContext;
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
@ -56,8 +57,9 @@ public interface CredentialsStrategy {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an AuthCredentials instance for this strategy, given the properties defined by the user.
|
* Creates an AuthCredentials instance for this strategy, given the properties defined by the user.
|
||||||
*
|
* @param transportFactory Sub-classes should utilize this transport factory
|
||||||
|
* to support common network related configs such as proxy
|
||||||
* @throws IOException if the provided credentials cannot be accessed or are invalid
|
* @throws IOException if the provided credentials cannot be accessed or are invalid
|
||||||
*/
|
*/
|
||||||
GoogleCredentials getGoogleCredentials(Map<PropertyDescriptor, String> properties) throws IOException;
|
GoogleCredentials getGoogleCredentials(Map<PropertyDescriptor, String> properties, HttpTransportFactory transportFactory) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,13 +16,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.gcp.credentials.factory.strategies;
|
package org.apache.nifi.processors.gcp.credentials.factory.strategies;
|
||||||
|
|
||||||
import com.google.auth.oauth2.GoogleCredentials;
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.ValidationContext;
|
import org.apache.nifi.components.ValidationContext;
|
||||||
import org.apache.nifi.components.ValidationResult;
|
import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.processors.gcp.credentials.factory.CredentialsStrategy;
|
import org.apache.nifi.processors.gcp.credentials.factory.CredentialsStrategy;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -79,9 +77,6 @@ public abstract class AbstractCredentialsStrategy implements CredentialsStrategy
|
||||||
return validationFailureResults;
|
return validationFailureResults;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public abstract GoogleCredentials getGoogleCredentials(Map<PropertyDescriptor, String> properties) throws IOException;
|
|
||||||
|
|
||||||
public String getName() {
|
public String getName() {
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,18 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.gcp.credentials.factory.strategies;
|
package org.apache.nifi.processors.gcp.credentials.factory.strategies;
|
||||||
|
|
||||||
|
import com.google.auth.http.HttpTransportFactory;
|
||||||
import com.google.auth.oauth2.GoogleCredentials;
|
import com.google.auth.oauth2.GoogleCredentials;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import com.google.api.client.http.HttpTransport;
|
|
||||||
import com.google.api.client.http.javanet.NetHttpTransport;
|
|
||||||
import com.google.auth.http.HttpTransportFactory;
|
|
||||||
import org.apache.nifi.util.StringUtils;
|
|
||||||
import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.Proxy;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -42,25 +36,8 @@ public abstract class AbstractServiceAccountCredentialsStrategy extends Abstract
|
||||||
protected abstract InputStream getServiceAccountJson(Map<PropertyDescriptor, String> properties) throws IOException;
|
protected abstract InputStream getServiceAccountJson(Map<PropertyDescriptor, String> properties) throws IOException;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GoogleCredentials getGoogleCredentials(Map<PropertyDescriptor, String> properties) throws IOException {
|
public GoogleCredentials getGoogleCredentials(Map<PropertyDescriptor, String> properties, HttpTransportFactory transportFactory) throws IOException {
|
||||||
final String proxyHost = properties.get(CredentialPropertyDescriptors.PROXY_HOST);
|
return GoogleCredentials.fromStream(getServiceAccountJson(properties), transportFactory);
|
||||||
final String proxyPortString = properties.get(CredentialPropertyDescriptors.PROXY_PORT);
|
|
||||||
final Integer proxyPort = (proxyPortString != null && proxyPortString.matches("-?\\d+")) ?
|
|
||||||
Integer.parseInt(proxyPortString) : 0;
|
|
||||||
|
|
||||||
if (!StringUtils.isBlank(proxyHost) && proxyPort > 0) {
|
|
||||||
return GoogleCredentials.fromStream(getServiceAccountJson(properties),
|
|
||||||
new HttpTransportFactory() {
|
|
||||||
@Override
|
|
||||||
public HttpTransport create() {
|
|
||||||
return new NetHttpTransport.Builder()
|
|
||||||
.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)))
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
return GoogleCredentials.fromStream(getServiceAccountJson(properties));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.gcp.credentials.factory.strategies;
|
package org.apache.nifi.processors.gcp.credentials.factory.strategies;
|
||||||
|
|
||||||
|
import com.google.auth.http.HttpTransportFactory;
|
||||||
import com.google.auth.oauth2.ComputeEngineCredentials;
|
import com.google.auth.oauth2.ComputeEngineCredentials;
|
||||||
import com.google.auth.oauth2.GoogleCredentials;
|
import com.google.auth.oauth2.GoogleCredentials;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
@ -36,7 +37,9 @@ public class ComputeEngineCredentialsStrategy extends AbstractBooleanCredentials
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GoogleCredentials getGoogleCredentials(Map<PropertyDescriptor, String> properties) throws IOException {
|
public GoogleCredentials getGoogleCredentials(Map<PropertyDescriptor, String> properties, HttpTransportFactory transportFactory) throws IOException {
|
||||||
return new ComputeEngineCredentials();
|
return ComputeEngineCredentials.newBuilder()
|
||||||
|
.setHttpTransportFactory(transportFactory)
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.gcp.credentials.factory.strategies;
|
package org.apache.nifi.processors.gcp.credentials.factory.strategies;
|
||||||
|
|
||||||
|
import com.google.auth.http.HttpTransportFactory;
|
||||||
import com.google.auth.oauth2.GoogleCredentials;
|
import com.google.auth.oauth2.GoogleCredentials;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
|
import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
|
||||||
|
@ -38,8 +39,8 @@ public class ExplicitApplicationDefaultCredentialsStrategy extends AbstractBoole
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GoogleCredentials getGoogleCredentials(Map<PropertyDescriptor, String> properties) throws IOException {
|
public GoogleCredentials getGoogleCredentials(Map<PropertyDescriptor, String> properties, HttpTransportFactory transportFactory) throws IOException {
|
||||||
return GoogleCredentials.getApplicationDefault();
|
return GoogleCredentials.getApplicationDefault(transportFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.gcp.credentials.factory.strategies;
|
package org.apache.nifi.processors.gcp.credentials.factory.strategies;
|
||||||
|
|
||||||
|
import com.google.auth.http.HttpTransportFactory;
|
||||||
import com.google.auth.oauth2.GoogleCredentials;
|
import com.google.auth.oauth2.GoogleCredentials;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
|
||||||
|
@ -35,8 +36,8 @@ public class ImplicitApplicationDefaultCredentialsStrategy extends AbstractCrede
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public GoogleCredentials getGoogleCredentials(Map<PropertyDescriptor, String> properties) throws IOException {
|
public GoogleCredentials getGoogleCredentials(Map<PropertyDescriptor, String> properties, HttpTransportFactory transportFactory) throws IOException {
|
||||||
return GoogleCredentials.getApplicationDefault();
|
return GoogleCredentials.getApplicationDefault(transportFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.gcp.credentials.service;
|
package org.apache.nifi.processors.gcp.credentials.service;
|
||||||
|
|
||||||
|
import com.google.auth.http.HttpTransportFactory;
|
||||||
import com.google.auth.oauth2.GoogleCredentials;
|
import com.google.auth.oauth2.GoogleCredentials;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
@ -25,10 +26,12 @@ import org.apache.nifi.components.ValidationContext;
|
||||||
import org.apache.nifi.components.ValidationResult;
|
import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
|
||||||
import org.apache.nifi.processors.gcp.credentials.factory.CredentialsFactory;
|
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
|
||||||
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
|
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
|
||||||
|
import org.apache.nifi.processors.gcp.credentials.factory.CredentialsFactory;
|
||||||
|
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||||
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -40,8 +43,7 @@ import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPrope
|
||||||
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
|
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
|
||||||
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.USE_APPLICATION_DEFAULT_CREDENTIALS;
|
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.USE_APPLICATION_DEFAULT_CREDENTIALS;
|
||||||
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.USE_COMPUTE_ENGINE_CREDENTIALS;
|
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.USE_COMPUTE_ENGINE_CREDENTIALS;
|
||||||
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.PROXY_HOST;
|
|
||||||
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.PROXY_PORT;
|
|
||||||
/**
|
/**
|
||||||
* Implementation of GCPCredentialsService interface
|
* Implementation of GCPCredentialsService interface
|
||||||
*
|
*
|
||||||
|
@ -63,8 +65,7 @@ public class GCPCredentialsControllerService extends AbstractControllerService i
|
||||||
props.add(USE_COMPUTE_ENGINE_CREDENTIALS);
|
props.add(USE_COMPUTE_ENGINE_CREDENTIALS);
|
||||||
props.add(SERVICE_ACCOUNT_JSON_FILE);
|
props.add(SERVICE_ACCOUNT_JSON_FILE);
|
||||||
props.add(SERVICE_ACCOUNT_JSON);
|
props.add(SERVICE_ACCOUNT_JSON);
|
||||||
props.add(PROXY_HOST);
|
props.add(ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS));
|
||||||
props.add(PROXY_PORT);
|
|
||||||
properties = Collections.unmodifiableList(props);
|
properties = Collections.unmodifiableList(props);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,13 +83,17 @@ public class GCPCredentialsControllerService extends AbstractControllerService i
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||||
return credentialsProviderFactory.validate(validationContext);
|
final Collection<ValidationResult> results = credentialsProviderFactory.validate(validationContext);
|
||||||
|
ProxyConfiguration.validateProxySpec(validationContext, results, ProxyAwareTransportFactory.PROXY_SPECS);
|
||||||
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnEnabled
|
@OnEnabled
|
||||||
public void onConfigured(final ConfigurationContext context) throws InitializationException {
|
public void onConfigured(final ConfigurationContext context) throws InitializationException {
|
||||||
try {
|
try {
|
||||||
googleCredentials = credentialsProviderFactory.getGoogleCredentials(context.getProperties());
|
final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);
|
||||||
|
final HttpTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
|
||||||
|
googleCredentials = credentialsProviderFactory.getGoogleCredentials(context.getProperties(), transportFactory);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new InitializationException(e);
|
throw new InitializationException(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,26 +16,25 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.gcp.storage;
|
package org.apache.nifi.processors.gcp.storage;
|
||||||
|
|
||||||
import com.google.api.client.http.HttpTransport;
|
|
||||||
import com.google.api.client.http.javanet.NetHttpTransport;
|
|
||||||
import com.google.api.gax.retrying.RetrySettings;
|
import com.google.api.gax.retrying.RetrySettings;
|
||||||
import com.google.auth.http.HttpTransportFactory;
|
|
||||||
import com.google.auth.oauth2.GoogleCredentials;
|
import com.google.auth.oauth2.GoogleCredentials;
|
||||||
import com.google.cloud.http.HttpTransportOptions;
|
import com.google.cloud.http.HttpTransportOptions;
|
||||||
import com.google.cloud.storage.Storage;
|
import com.google.cloud.storage.Storage;
|
||||||
import com.google.cloud.storage.StorageOptions;
|
import com.google.cloud.storage.StorageOptions;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
|
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
|
||||||
import org.apache.nifi.util.StringUtils;
|
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
|
||||||
|
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||||
|
|
||||||
import java.net.Authenticator;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.PasswordAuthentication;
|
|
||||||
import java.net.Proxy;
|
import java.net.Proxy;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -71,16 +70,25 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected final Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||||
|
final Collection<ValidationResult> results = new ArrayList<>();
|
||||||
|
ProxyConfiguration.validateProxySpec(validationContext, results, ProxyAwareTransportFactory.PROXY_SPECS);
|
||||||
|
customValidate(validationContext, results);
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If sub-classes needs to implement any custom validation, override this method then add validation result to the results.
|
||||||
|
*/
|
||||||
|
protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected StorageOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
|
protected StorageOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
|
||||||
final String projectId = context.getProperty(PROJECT_ID).getValue();
|
final String projectId = context.getProperty(PROJECT_ID).getValue();
|
||||||
final Integer retryCount = context.getProperty(RETRY_COUNT).asInteger();
|
final Integer retryCount = context.getProperty(RETRY_COUNT).asInteger();
|
||||||
|
|
||||||
final String proxyHost = context.getProperty(PROXY_HOST).getValue();
|
|
||||||
final Integer proxyPort = context.getProperty(PROXY_PORT).asInteger();
|
|
||||||
final String proxyUser = context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue();
|
|
||||||
final String proxyPassword = context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
|
|
||||||
|
|
||||||
StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder()
|
StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder()
|
||||||
.setCredentials(credentials)
|
.setCredentials(credentials)
|
||||||
.setProjectId(projectId)
|
.setProjectId(projectId)
|
||||||
|
@ -88,25 +96,26 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
|
||||||
.setMaxAttempts(retryCount)
|
.setMaxAttempts(retryCount)
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
if (!StringUtils.isBlank(proxyHost) && proxyPort > 0) {
|
final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context, () -> {
|
||||||
storageOptionsBuilder.setTransportOptions(HttpTransportOptions.newBuilder().setHttpTransportFactory(new HttpTransportFactory() {
|
final String proxyHost = context.getProperty(PROXY_HOST).getValue();
|
||||||
@Override
|
final Integer proxyPort = context.getProperty(PROXY_PORT).asInteger();
|
||||||
public HttpTransport create() {
|
if (proxyHost != null && proxyPort != null && proxyPort > 0) {
|
||||||
if (!StringUtils.isBlank(proxyUser) && !StringUtils.isBlank(proxyPassword)) {
|
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
|
||||||
Authenticator authenticator = new Authenticator() {
|
final String proxyUser = context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue();
|
||||||
public PasswordAuthentication getPasswordAuthentication() {
|
final String proxyPassword = context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
|
||||||
return (new PasswordAuthentication(proxyUser,
|
componentProxyConfig.setProxyType(Proxy.Type.HTTP);
|
||||||
proxyPassword.toCharArray()));
|
componentProxyConfig.setProxyServerHost(proxyHost);
|
||||||
}
|
componentProxyConfig.setProxyServerPort(proxyPort);
|
||||||
};
|
componentProxyConfig.setProxyUserName(proxyUser);
|
||||||
Authenticator.setDefault(authenticator);
|
componentProxyConfig.setProxyUserPassword(proxyPassword);
|
||||||
}
|
return componentProxyConfig;
|
||||||
return new NetHttpTransport.Builder()
|
}
|
||||||
.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)))
|
return ProxyConfiguration.DIRECT_CONFIGURATION;
|
||||||
.build();
|
});
|
||||||
}
|
|
||||||
}).build());
|
final ProxyAwareTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
|
||||||
}
|
storageOptionsBuilder.setTransportOptions(HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build());
|
||||||
|
|
||||||
return storageOptionsBuilder.build();
|
return storageOptionsBuilder.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.gcp.credentials.factory;
|
package org.apache.nifi.processors.gcp.credentials.factory;
|
||||||
|
|
||||||
|
import com.google.api.client.http.HttpTransport;
|
||||||
|
import com.google.api.client.http.javanet.NetHttpTransport;
|
||||||
|
import com.google.auth.http.HttpTransportFactory;
|
||||||
import com.google.auth.oauth2.ComputeEngineCredentials;
|
import com.google.auth.oauth2.ComputeEngineCredentials;
|
||||||
import com.google.auth.oauth2.GoogleCredentials;
|
import com.google.auth.oauth2.GoogleCredentials;
|
||||||
import com.google.auth.oauth2.ServiceAccountCredentials;
|
import com.google.auth.oauth2.ServiceAccountCredentials;
|
||||||
|
@ -39,6 +42,9 @@ import static org.junit.Assert.assertTrue;
|
||||||
*/
|
*/
|
||||||
public class CredentialsFactoryTest {
|
public class CredentialsFactoryTest {
|
||||||
|
|
||||||
|
private static final HttpTransport TRANSPORT = new NetHttpTransport();
|
||||||
|
private static final HttpTransportFactory TRANSPORT_FACTORY = () -> TRANSPORT;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCredentialPropertyDescriptorClassCannotBeInvoked() throws Exception {
|
public void testCredentialPropertyDescriptorClassCannotBeInvoked() throws Exception {
|
||||||
Constructor constructor = CredentialPropertyDescriptors.class.getDeclaredConstructor();
|
Constructor constructor = CredentialPropertyDescriptors.class.getDeclaredConstructor();
|
||||||
|
@ -54,7 +60,7 @@ public class CredentialsFactoryTest {
|
||||||
|
|
||||||
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||||
final CredentialsFactory factory = new CredentialsFactory();
|
final CredentialsFactory factory = new CredentialsFactory();
|
||||||
final GoogleCredentials credentials = factory.getGoogleCredentials(properties);
|
final GoogleCredentials credentials = factory.getGoogleCredentials(properties, TRANSPORT_FACTORY);
|
||||||
|
|
||||||
assertNotNull(credentials);
|
assertNotNull(credentials);
|
||||||
}
|
}
|
||||||
|
@ -67,7 +73,7 @@ public class CredentialsFactoryTest {
|
||||||
|
|
||||||
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||||
final CredentialsFactory factory = new CredentialsFactory();
|
final CredentialsFactory factory = new CredentialsFactory();
|
||||||
final GoogleCredentials credentials = factory.getGoogleCredentials(properties);
|
final GoogleCredentials credentials = factory.getGoogleCredentials(properties, TRANSPORT_FACTORY);
|
||||||
|
|
||||||
assertNotNull(credentials);
|
assertNotNull(credentials);
|
||||||
}
|
}
|
||||||
|
@ -89,7 +95,7 @@ public class CredentialsFactoryTest {
|
||||||
|
|
||||||
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||||
final CredentialsFactory factory = new CredentialsFactory();
|
final CredentialsFactory factory = new CredentialsFactory();
|
||||||
final GoogleCredentials credentials = factory.getGoogleCredentials(properties);
|
final GoogleCredentials credentials = factory.getGoogleCredentials(properties, TRANSPORT_FACTORY);
|
||||||
|
|
||||||
assertNotNull(credentials);
|
assertNotNull(credentials);
|
||||||
assertEquals("credentials class should be equal", ServiceAccountCredentials.class,
|
assertEquals("credentials class should be equal", ServiceAccountCredentials.class,
|
||||||
|
@ -117,7 +123,7 @@ public class CredentialsFactoryTest {
|
||||||
|
|
||||||
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||||
final CredentialsFactory factory = new CredentialsFactory();
|
final CredentialsFactory factory = new CredentialsFactory();
|
||||||
final GoogleCredentials credentials = factory.getGoogleCredentials(properties);
|
final GoogleCredentials credentials = factory.getGoogleCredentials(properties, TRANSPORT_FACTORY);
|
||||||
|
|
||||||
assertNotNull(credentials);
|
assertNotNull(credentials);
|
||||||
assertEquals("credentials class should be equal", ServiceAccountCredentials.class,
|
assertEquals("credentials class should be equal", ServiceAccountCredentials.class,
|
||||||
|
@ -132,7 +138,7 @@ public class CredentialsFactoryTest {
|
||||||
|
|
||||||
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
Map<PropertyDescriptor, String> properties = runner.getProcessContext().getProperties();
|
||||||
final CredentialsFactory factory = new CredentialsFactory();
|
final CredentialsFactory factory = new CredentialsFactory();
|
||||||
final GoogleCredentials credentials = factory.getGoogleCredentials(properties);
|
final GoogleCredentials credentials = factory.getGoogleCredentials(properties, TRANSPORT_FACTORY);
|
||||||
|
|
||||||
assertNotNull(credentials);
|
assertNotNull(credentials);
|
||||||
assertEquals("credentials class should be equal", ComputeEngineCredentials.class,
|
assertEquals("credentials class should be equal", ComputeEngineCredentials.class,
|
||||||
|
|
|
@ -31,6 +31,12 @@
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-standard-services-api-nar</artifactId>
|
||||||
|
<version>1.8.0-SNAPSHOT</version>
|
||||||
|
<type>nar</type>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-gcp-services-api</artifactId>
|
<artifactId>nifi-gcp-services-api</artifactId>
|
||||||
|
|
Loading…
Reference in New Issue