From f8d3fcd66cd53668d339ccbcb6d2175defb63393 Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Mon, 11 Mar 2024 16:45:17 +0100 Subject: [PATCH] NIFI-10707 Added proxy support in PutBigQuery Bumped GCP client library version Added grpc-* jars in service api nar in order to avoid CNFE warning in io.grpc.LoadBalancerRegistry Dependency clean-up in GCP modules Signed-off-by: Pierre Villard This closes #8491. --- .../nifi-gcp-bundle/nifi-gcp-nar/pom.xml | 196 ++++++++++++++++++ .../nifi-gcp-parameter-providers/pom.xml | 24 --- .../nifi-gcp-processors/pom.xml | 32 --- .../processors/gcp/bigquery/PutBigQuery.java | 41 +++- .../gcp/bigquery/PutBigQueryTest.java | 9 +- .../nifi-gcp-services-api/pom.xml | 29 +-- nifi-nar-bundles/nifi-gcp-bundle/pom.xml | 2 +- 7 files changed, 249 insertions(+), 84 deletions(-) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-nar/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-nar/pom.xml index b4a71b996e..d4a17e5b2c 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-nar/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-nar/pom.xml @@ -36,11 +36,207 @@ org.apache.nifi nifi-gcp-processors 2.0.0-SNAPSHOT + + + org.codehaus.mojo + animal-sniffer-annotations + + + com.google.android + annotations + + + com.google.auto.value + auto-value-annotations + + + org.checkerframework + checker-qual + + + com.google.errorprone + error_prone_annotations + + + com.google.guava + failureaccess + + + com.google.auth + google-auth-library-credentials + + + com.google.auth + google-auth-library-oauth2-http + + + com.google.http-client + google-http-client + + + com.google.http-client + google-http-client-gson + + + io.grpc + grpc-api + + + io.grpc + grpc-context + + + io.grpc + grpc-core + + + io.grpc + grpc-util + + + com.google.code.gson + gson + + + com.google.guava + guava + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpcore + + + com.google.j2objc + j2objc-annotations + + + com.google.code.findbugs + jsr305 + + + com.google.guava + listenablefuture + + + io.opencensus + opencensus-api + + + io.opencensus + opencensus-contrib-http-util + + + io.perfmark + perfmark-api + + org.apache.nifi nifi-gcp-parameter-providers 2.0.0-SNAPSHOT + + + org.codehaus.mojo + animal-sniffer-annotations + + + com.google.android + annotations + + + com.google.auto.value + auto-value-annotations + + + org.checkerframework + checker-qual + + + com.google.errorprone + error_prone_annotations + + + com.google.guava + failureaccess + + + com.google.auth + google-auth-library-credentials + + + com.google.auth + google-auth-library-oauth2-http + + + com.google.http-client + google-http-client + + + com.google.http-client + google-http-client-gson + + + io.grpc + grpc-api + + + io.grpc + grpc-context + + + io.grpc + grpc-core + + + io.grpc + grpc-util + + + com.google.code.gson + gson + + + com.google.guava + guava + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpcore + + + com.google.j2objc + j2objc-annotations + + + com.google.code.findbugs + jsr305 + + + com.google.guava + listenablefuture + + + io.opencensus + opencensus-api + + + io.opencensus + opencensus-contrib-http-util + + + io.perfmark + perfmark-api + + diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-parameter-providers/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-parameter-providers/pom.xml index e53128d48d..d576e15cb6 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-parameter-providers/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-parameter-providers/pom.xml @@ -42,33 +42,9 @@ 2.0.0-SNAPSHOT provided - - org.slf4j - jcl-over-slf4j - com.google.cloud google-cloud-secretmanager - - - commons-logging - commons-logging - - - - - com.google.auth - google-auth-library-oauth2-http - - - com.google.code.findbugs - jsr305 - - - commons-logging - commons-logging - - org.apache.nifi diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml index d95d101cd1..24e2f86ec9 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml @@ -94,12 +94,6 @@ com.google.cloud google-cloud-core - - - com.google.code.findbugs - jsr305 - - com.google.cloud @@ -109,10 +103,6 @@ com.google.cloud google-cloud-bigquery - - commons-logging - commons-logging - org.json json @@ -123,10 +113,6 @@ com.google.cloud google-cloud-bigquerystorage - - commons-logging - commons-logging - org.json json @@ -136,22 +122,10 @@ com.google.cloud google-cloud-pubsub - - - commons-logging - commons-logging - - com.google.cloud google-cloud-pubsublite - - - commons-logging - commons-logging - - com.google.apis @@ -195,12 +169,6 @@ com.google.cloud google-cloud-vision - - - commons-logging - commons-logging - - diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java index a7c27e8a7e..8536f232c7 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java @@ -21,6 +21,8 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BQTableSchemaToProtoDescriptor; @@ -42,6 +44,7 @@ import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.bigquery.storage.v1.stub.BigQueryWriteStubSettings; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; +import io.grpc.HttpConnectProxiedSocketAddress; import io.grpc.Status; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.TriggerSerially; @@ -59,7 +62,9 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory; import org.apache.nifi.processors.gcp.bigquery.proto.ProtoUtils; +import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.record.MapRecord; @@ -67,6 +72,8 @@ import org.apache.nifi.serialization.record.Record; import java.io.IOException; import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.Proxy; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; @@ -182,7 +189,8 @@ public class PutBigQuery extends AbstractBigQueryProcessor { TRANSFER_TYPE, APPEND_RECORD_COUNT, RETRY_COUNT, - SKIP_INVALID_ROWS + SKIP_INVALID_ROWS, + ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxyAwareTransportFactory.PROXY_SPECS) ).collect(collectingAndThen(toList(), Collections::unmodifiableList)); @Override @@ -198,7 +206,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor { maxRetryCount = context.getProperty(RETRY_COUNT).asInteger(); recordBatchCount = context.getProperty(APPEND_RECORD_COUNT).asInteger(); endpoint = context.getProperty(BIGQUERY_API_ENDPOINT).evaluateAttributeExpressions().getValue(); - writeClient = createWriteClient(getGoogleCredentials(context)); + writeClient = createWriteClient(getGoogleCredentials(context), ProxyConfiguration.getConfiguration(context)); } @OnUnscheduled @@ -225,7 +233,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor { writeStream = createWriteStream(tableName); tableSchema = writeStream.getTableSchema(); protoDescriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(tableSchema); - streamWriter = createStreamWriter(writeStream.getName(), protoDescriptor, getGoogleCredentials(context)); + streamWriter = createStreamWriter(writeStream.getName(), protoDescriptor, getGoogleCredentials(context), ProxyConfiguration.getConfiguration(context)); } catch (Descriptors.DescriptorValidationException | IOException e) { getLogger().error("Failed to create Big Query Stream Writer for writing", e); context.yield(); @@ -395,12 +403,13 @@ public class PutBigQuery extends AbstractBigQueryProcessor { return writeClient.createWriteStream(createWriteStreamRequest); } - protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials) { + protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) { BigQueryWriteClient client; try { BigQueryWriteSettings.Builder builder = BigQueryWriteSettings.newBuilder(); builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials)); builder.setEndpoint(endpoint); + builder.setTransportChannelProvider(createTransportChannelProvider(proxyConfiguration)); client = BigQueryWriteClient.create(builder.build()); } catch (Exception e) { @@ -410,13 +419,35 @@ public class PutBigQuery extends AbstractBigQueryProcessor { return client; } - protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials) throws IOException { + protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) throws IOException { ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor); StreamWriter.Builder builder = StreamWriter.newBuilder(streamName); builder.setWriterSchema(protoSchema); builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials)); builder.setEndpoint(endpoint); + builder.setChannelProvider(createTransportChannelProvider(proxyConfiguration)); + + return builder.build(); + } + + private TransportChannelProvider createTransportChannelProvider(ProxyConfiguration proxyConfiguration) { + InstantiatingGrpcChannelProvider.Builder builder = InstantiatingGrpcChannelProvider.newBuilder(); + + if (proxyConfiguration != null) { + if (proxyConfiguration.getProxyType() == Proxy.Type.HTTP) { + builder.setChannelConfigurator(managedChannelBuilder -> managedChannelBuilder.proxyDetector( + targetServerAddress -> HttpConnectProxiedSocketAddress.newBuilder() + .setTargetAddress((InetSocketAddress) targetServerAddress) + .setProxyAddress(new InetSocketAddress(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort())) + .setUsername(proxyConfiguration.getProxyUserName()) + .setPassword(proxyConfiguration.getProxyUserPassword()) + .build() + )); + } else if (proxyConfiguration.getProxyType() == Proxy.Type.SOCKS) { + getLogger().warn("Proxy type SOCKS is not supported, the proxy configuration will be ignored"); + } + } return builder.build(); } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java index 10b238220d..3ff7edf68c 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java @@ -47,6 +47,7 @@ import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Processor; import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; +import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.util.TestRunner; @@ -162,12 +163,12 @@ public class PutBigQueryTest { } @Override - protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials) { + protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) { return streamWriter; } @Override - protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials) { + protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) { return writeClient; } }; @@ -410,12 +411,12 @@ public class PutBigQueryTest { } @Override - protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials) throws IOException { + protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) throws IOException { throw new IOException(); } @Override - protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials) { + protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials, ProxyConfiguration proxyConfiguration) { return writeClient; } }; diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml index 3714d85228..354a54896f 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml @@ -33,29 +33,22 @@ com.google.auth google-auth-library-oauth2-http - - - com.google.code.findbugs - jsr305 - - - commons-logging - commons-logging - - - org.slf4j - jcl-over-slf4j + io.grpc + grpc-api - com.github.stephenc.findbugs - findbugs-annotations - 1.3.9-1 + io.grpc + grpc-context - com.fasterxml.jackson.core - jackson-databind - + io.grpc + grpc-core + + + io.grpc + grpc-util + diff --git a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml index 5ef800b8ff..64568d562d 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml @@ -27,7 +27,7 @@ pom - 26.25.0 + 26.34.0