mirror of https://github.com/apache/nifi.git
NIFI-4197 - Expose some proxy settings to GCS Processors
Signed-off-by: James Wing <jvwing@gmail.com> This closes #2017.
This commit is contained in:
parent
d9866c75e2
commit
8d4fe38bb4
|
@ -55,6 +55,25 @@ public abstract class AbstractGCPProcessor<
|
||||||
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor
|
||||||
|
.Builder().name("gcp-proxy-host")
|
||||||
|
.displayName("Proxy host")
|
||||||
|
.description("IP or hostname of the proxy to be used")
|
||||||
|
.required(false)
|
||||||
|
.expressionLanguageSupported(false)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor
|
||||||
|
.Builder().name("gcp-proxy-port")
|
||||||
|
.displayName("Proxy port")
|
||||||
|
.description("Proxy port number")
|
||||||
|
.required(false)
|
||||||
|
.expressionLanguageSupported(false)
|
||||||
|
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Links to the {@link GCPCredentialsService} which provides credentials for this particular processor.
|
* Links to the {@link GCPCredentialsService} which provides credentials for this particular processor.
|
||||||
*/
|
*/
|
||||||
|
@ -78,7 +97,9 @@ public abstract class AbstractGCPProcessor<
|
||||||
return ImmutableList.of(
|
return ImmutableList.of(
|
||||||
GCP_CREDENTIALS_PROVIDER_SERVICE,
|
GCP_CREDENTIALS_PROVIDER_SERVICE,
|
||||||
PROJECT_ID,
|
PROJECT_ID,
|
||||||
RETRY_COUNT
|
RETRY_COUNT,
|
||||||
|
PROXY_HOST,
|
||||||
|
PROXY_PORT
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.gcp.storage;
|
package org.apache.nifi.processors.gcp.storage;
|
||||||
|
|
||||||
|
import com.google.api.client.http.HttpTransport;
|
||||||
|
import com.google.api.client.http.javanet.NetHttpTransport;
|
||||||
|
import com.google.auth.http.HttpTransportFactory;
|
||||||
import com.google.auth.oauth2.GoogleCredentials;
|
import com.google.auth.oauth2.GoogleCredentials;
|
||||||
import com.google.cloud.RetryParams;
|
import com.google.cloud.RetryParams;
|
||||||
import com.google.cloud.storage.Storage;
|
import com.google.cloud.storage.Storage;
|
||||||
|
@ -26,7 +29,10 @@ import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
|
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
|
||||||
|
import org.apache.nifi.util.StringUtils;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.Proxy;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -66,15 +72,36 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
|
||||||
@Override
|
@Override
|
||||||
protected StorageOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
|
protected StorageOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
|
||||||
final String projectId = context.getProperty(PROJECT_ID).getValue();
|
final String projectId = context.getProperty(PROJECT_ID).getValue();
|
||||||
final Integer retryCount = Integer.valueOf(context.getProperty(RETRY_COUNT).getValue());
|
final Integer retryCount = context.getProperty(RETRY_COUNT).asInteger();
|
||||||
|
|
||||||
return StorageOptions.newBuilder()
|
final String proxyHost = context.getProperty(PROXY_HOST).getValue();
|
||||||
|
final Integer proxyPort = context.getProperty(PROXY_PORT).asInteger();
|
||||||
|
|
||||||
|
StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder()
|
||||||
.setCredentials(credentials)
|
.setCredentials(credentials)
|
||||||
.setProjectId(projectId)
|
.setProjectId(projectId)
|
||||||
.setRetryParams(RetryParams.newBuilder()
|
.setRetryParams(RetryParams.newBuilder()
|
||||||
.setRetryMaxAttempts(retryCount)
|
.setRetryMaxAttempts(retryCount)
|
||||||
.setRetryMinAttempts(retryCount)
|
.setRetryMinAttempts(retryCount)
|
||||||
.build())
|
.build());
|
||||||
|
|
||||||
|
if (!StringUtils.isBlank(proxyHost) && proxyPort > 0) {
|
||||||
|
storageOptionsBuilder.setHttpTransportFactory(new HttpTransportFactory() {
|
||||||
|
@Override
|
||||||
|
public HttpTransport create() {
|
||||||
|
final HttpTransport transport = new NetHttpTransport.Builder()
|
||||||
|
.setProxy(
|
||||||
|
new Proxy(
|
||||||
|
Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)
|
||||||
|
)
|
||||||
|
)
|
||||||
.build();
|
.build();
|
||||||
|
return transport;
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return storageOptionsBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue