NIFI-10707 Added proxy support in PutBigQuery

Bumped GCP client library version
Added grpc-* jars in service api nar in order to avoid CNFE warning in io.grpc.LoadBalancerRegistry
Dependency clean-up in GCP modules

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8491.
This commit is contained in:
Peter Turcsanyi 2024-03-11 16:45:17 +01:00 committed by Pierre Villard
parent 275dcc1e50
commit f8d3fcd66c
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
7 changed files with 249 additions and 84 deletions

View File

@ -36,11 +36,207 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-gcp-processors</artifactId>
<version>2.0.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.android</groupId>
<artifactId>annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>failureaccess</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client-gson</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-context</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-util</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.j2objc</groupId>
<artifactId>j2objc-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>listenablefuture</artifactId>
</exclusion>
<exclusion>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-api</artifactId>
</exclusion>
<exclusion>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-contrib-http-util</artifactId>
</exclusion>
<exclusion>
<groupId>io.perfmark</groupId>
<artifactId>perfmark-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-gcp-parameter-providers</artifactId>
<version>2.0.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.android</groupId>
<artifactId>annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>failureaccess</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client-gson</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-context</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
</exclusion>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-util</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.j2objc</groupId>
<artifactId>j2objc-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>listenablefuture</artifactId>
</exclusion>
<exclusion>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-api</artifactId>
</exclusion>
<exclusion>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-contrib-http-util</artifactId>
</exclusion>
<exclusion>
<groupId>io.perfmark</groupId>
<artifactId>perfmark-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@ -42,33 +42,9 @@
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-secretmanager</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -94,12 +94,6 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-core</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
@ -109,10 +103,6 @@
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
@ -123,10 +113,6 @@
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquerystorage</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
@ -136,22 +122,10 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
@ -195,12 +169,6 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-vision</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

View File

@ -21,6 +21,8 @@ import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BQTableSchemaToProtoDescriptor;
@ -42,6 +44,7 @@ import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.stub.BigQueryWriteStubSettings;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.Status;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
@ -59,7 +62,9 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.processors.gcp.bigquery.proto.ProtoUtils;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.MapRecord;
@ -67,6 +72,8 @@ import org.apache.nifi.serialization.record.Record;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
@ -182,7 +189,8 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
TRANSFER_TYPE,
APPEND_RECORD_COUNT,
RETRY_COUNT,
SKIP_INVALID_ROWS
SKIP_INVALID_ROWS,
ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS)
).collect(collectingAndThen(toList(), Collections::unmodifiableList));
@Override
@ -198,7 +206,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
maxRetryCount = context.getProperty(RETRY_COUNT).asInteger();
recordBatchCount = context.getProperty(APPEND_RECORD_COUNT).asInteger();
endpoint = context.getProperty(BIGQUERY_API_ENDPOINT).evaluateAttributeExpressions().getValue();
writeClient = createWriteClient(getGoogleCredentials(context));
writeClient = createWriteClient(getGoogleCredentials(context), ProxyConfiguration.getConfiguration(context));
}
@OnUnscheduled
@ -225,7 +233,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
writeStream = createWriteStream(tableName);
tableSchema = writeStream.getTableSchema();
protoDescriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(tableSchema);
streamWriter = createStreamWriter(writeStream.getName(), protoDescriptor, getGoogleCredentials(context));
streamWriter = createStreamWriter(writeStream.getName(), protoDescriptor, getGoogleCredentials(context), ProxyConfiguration.getConfiguration(context));
} catch (Descriptors.DescriptorValidationException | IOException e) {
getLogger().error("Failed to create Big Query Stream Writer for writing", e);
context.yield();
@ -395,12 +403,13 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
return writeClient.createWriteStream(createWriteStreamRequest);
}
protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials) {
protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) {
BigQueryWriteClient client;
try {
BigQueryWriteSettings.Builder builder = BigQueryWriteSettings.newBuilder();
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
builder.setEndpoint(endpoint);
builder.setTransportChannelProvider(createTransportChannelProvider(proxyConfiguration));
client = BigQueryWriteClient.create(builder.build());
} catch (Exception e) {
@ -410,13 +419,35 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
return client;
}
protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials) throws IOException {
protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) throws IOException {
ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor);
StreamWriter.Builder builder = StreamWriter.newBuilder(streamName);
builder.setWriterSchema(protoSchema);
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
builder.setEndpoint(endpoint);
builder.setChannelProvider(createTransportChannelProvider(proxyConfiguration));
return builder.build();
}
private TransportChannelProvider createTransportChannelProvider(ProxyConfiguration proxyConfiguration) {
InstantiatingGrpcChannelProvider.Builder builder = InstantiatingGrpcChannelProvider.newBuilder();
if (proxyConfiguration != null) {
if (proxyConfiguration.getProxyType() == Proxy.Type.HTTP) {
builder.setChannelConfigurator(managedChannelBuilder -> managedChannelBuilder.proxyDetector(
targetServerAddress -> HttpConnectProxiedSocketAddress.newBuilder()
.setTargetAddress((InetSocketAddress) targetServerAddress)
.setProxyAddress(new InetSocketAddress(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort()))
.setUsername(proxyConfiguration.getProxyUserName())
.setPassword(proxyConfiguration.getProxyUserPassword())
.build()
));
} else if (proxyConfiguration.getProxyType() == Proxy.Type.SOCKS) {
getLogger().warn("Proxy type SOCKS is not supported, the proxy configuration will be ignored");
}
}
return builder.build();
}

View File

@ -47,6 +47,7 @@ import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.util.TestRunner;
@ -162,12 +163,12 @@ public class PutBigQueryTest {
}
@Override
protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials) {
protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) {
return streamWriter;
}
@Override
protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials) {
protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) {
return writeClient;
}
};
@ -410,12 +411,12 @@ public class PutBigQueryTest {
}
@Override
protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials) throws IOException {
protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) throws IOException {
throw new IOException();
}
@Override
protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials) {
protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) {
return writeClient;
}
};

View File

@ -33,29 +33,22 @@
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
</dependency>
<dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
<version>1.3.9-1</version>
<groupId>io.grpc</groupId>
<artifactId>grpc-context</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency> <!-- TODO: Remove this when the next version of google-auth-library-oauth2-http is released and brings this in-->
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-util</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -27,7 +27,7 @@
<packaging>pom</packaging>
<properties>
<google.libraries.version>26.25.0</google.libraries.version>
<google.libraries.version>26.34.0</google.libraries.version>
</properties>
<dependencyManagement>