From 03ef6465478eea09704a2b6eb9be16f5001007b9 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 18 Sep 2018 23:19:03 +0200 Subject: [PATCH] NIFI-4731 This closes #3019. This closes #2682. This closes #2420. NIFI-4933 BigQuery PR Review Signed-off-by: joewitt --- .../processors/gcp/AbstractGCPProcessor.java | 47 ++- .../bigquery/AbstractBigQueryProcessor.java | 62 +++- .../gcp/bigquery/BigQueryAttributes.java | 109 +++++-- .../{BqUtils.java => BigQueryUtils.java} | 20 +- .../gcp/bigquery/PutBigQueryBatch.java | 300 +++++++++++------- .../gcp/pubsub/AbstractGCPubSubProcessor.java | 21 ++ .../gcp/storage/AbstractGCSProcessor.java | 27 +- .../nifi-gcp-services-api/pom.xml | 2 +- nifi-nar-bundles/nifi-gcp-bundle/pom.xml | 5 +- 9 files changed, 405 insertions(+), 188 deletions(-) rename nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/{BqUtils.java => BigQueryUtils.java} (94%) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java index e95bf73830..0c360d1e0f 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java @@ -19,16 +19,20 @@ package org.apache.nifi.processors.gcp; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.Service; import com.google.cloud.ServiceOptions; +import com.google.cloud.TransportOptions; +import com.google.cloud.http.HttpTransportOptions; import com.google.common.collect.ImmutableList; + import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.gcp.credentials.service.GCPCredentialsService; import org.apache.nifi.proxy.ProxyConfiguration; +import java.net.Proxy; import java.util.List; /** @@ -65,7 +69,7 @@ public abstract class AbstractGCPProcessor< "-Djdk.http.auth.tunneling.disabledSchemes=\n" + "-Djdk.http.auth.proxying.disabledSchemes=") .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -74,14 +78,14 @@ public abstract class AbstractGCPProcessor< .displayName("Proxy port") .description("Proxy port number") .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.PORT_VALIDATOR) .build(); public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor .Builder().name("gcp-proxy-user-name") - .displayName("Http Proxy Username") - .description("Http Proxy Username") + .displayName("HTTP Proxy Username") + .description("HTTP Proxy Username") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .required(false) @@ -89,8 +93,8 @@ public abstract class AbstractGCPProcessor< public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor .Builder().name("gcp-proxy-user-password") - .displayName("Http Proxy Password") - .description("Http Proxy Password") + .displayName("HTTP Proxy Password") + .description("HTTP Proxy Password") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .required(false) @@ -160,4 +164,31 @@ public abstract class AbstractGCPProcessor< * @see ServiceOptions */ protected abstract CloudServiceOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials); + + /** + * Builds the Transport Options containing the proxy configuration + * @param context Context to get properties + * @return Transport options object with proxy configuration + */ + protected TransportOptions getTransportOptions(ProcessContext context) { + final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context, () -> { + final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue(); + final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger(); + if (proxyHost != null && proxyPort != null && proxyPort > 0) { + final ProxyConfiguration componentProxyConfig = new ProxyConfiguration(); + final String proxyUser = context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue(); + final String proxyPassword = context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue(); + componentProxyConfig.setProxyType(Proxy.Type.HTTP); + componentProxyConfig.setProxyServerHost(proxyHost); + componentProxyConfig.setProxyServerPort(proxyPort); + componentProxyConfig.setProxyUserName(proxyUser); + componentProxyConfig.setProxyUserPassword(proxyPassword); + return componentProxyConfig; + } + return ProxyConfiguration.DIRECT_CONFIGURATION; + }); + + final ProxyAwareTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration); + return HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build(); + } } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java index b52a552fd1..c249e7ec36 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java @@ -22,15 +22,21 @@ import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.common.collect.ImmutableList; + import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.gcp.AbstractGCPProcessor; +import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory; +import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.util.StringUtils; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -40,7 +46,9 @@ import java.util.Set; * Base class for creating processors that connect to GCP BiqQuery service */ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor { + static final int BUFFER_SIZE = 65536; + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") .description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.") @@ -53,8 +61,8 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor relationships = Collections.unmodifiableSet( new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); - public static final PropertyDescriptor DATASET = new PropertyDescriptor - .Builder().name(BigQueryAttributes.DATASET_ATTR) + public static final PropertyDescriptor DATASET = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.DATASET_ATTR) .displayName("Dataset") .description(BigQueryAttributes.DATASET_DESC) .required(true) @@ -63,8 +71,8 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor getSupportedPropertyDescriptors() { return ImmutableList.builder() .addAll(super.getSupportedPropertyDescriptors()) + .add(DATASET) + .add(TABLE_NAME) + .add(TABLE_SCHEMA) + .add(READ_TIMEOUT) .build(); } @@ -109,14 +121,40 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor customValidate(ValidationContext validationContext) { + final Collection results = super.customValidate(validationContext); + ProxyConfiguration.validateProxySpec(validationContext, results, ProxyAwareTransportFactory.PROXY_SPECS); + + final boolean projectId = validationContext.getProperty(PROJECT_ID).isSet(); + if (!projectId) { + results.add(new ValidationResult.Builder() + .subject(PROJECT_ID.getName()) + .valid(false) + .explanation("The Project ID must be set for this processor.") + .build()); + } + + customValidate(validationContext, results); + return results; + } + + /** + * If sub-classes needs to implement any custom validation, override this method then add validation result to the results. + */ + protected void customValidate(ValidationContext validationContext, Collection results) { + } + } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java index 380f7017fc..842a1762d0 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java @@ -17,9 +17,12 @@ package org.apache.nifi.processors.gcp.bigquery; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors; +import com.google.cloud.bigquery.JobInfo; + /** * Attributes associated with the BigQuery processors */ @@ -28,44 +31,75 @@ public class BigQueryAttributes { public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE; + // Properties + public static final String SOURCE_TYPE_ATTR = "bq.load.type"; + public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded. Possible values: AVRO, " + + "NEWLINE_DELIMITED_JSON, CSV."; + + public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown"; + public static final String IGNORE_UNKNOWN_DESC = "Sets whether BigQuery should allow extra values that are not represented " + + "in the table schema. If true, the extra values are ignored. If false, records with extra columns are treated as " + + "bad records, and if there are too many bad records, an invalid error is returned in the job result. By default " + + "unknown values are not allowed."; + + public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition"; + public static final String WRITE_DISPOSITION_DESC = "Sets the action that should occur if the destination table already exists."; + + public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords"; + public static final String MAX_BADRECORDS_DESC = "Sets the maximum number of bad records that BigQuery can ignore when running " + + "the job. If the number of bad records exceeds this value, an invalid error is returned in the job result. By default " + + "no bad record is ignored."; + public static final String DATASET_ATTR = "bq.dataset"; - public static final String DATASET_DESC = "BigQuery dataset"; + public static final String DATASET_DESC = "BigQuery dataset name (Note - The dataset must exist in GCP)"; public static final String TABLE_NAME_ATTR = "bq.table.name"; public static final String TABLE_NAME_DESC = "BigQuery table name"; public static final String TABLE_SCHEMA_ATTR = "bq.table.schema"; - public static final String TABLE_SCHEMA_DESC = "BigQuery table name"; + public static final String TABLE_SCHEMA_DESC = "BigQuery schema in JSON format"; public static final String CREATE_DISPOSITION_ATTR = "bq.load.create_disposition"; - public static final String CREATE_DISPOSITION_DESC = "Options for table creation"; - - public static final String JOB_ERROR_MSG_ATTR = "bq.error.message"; - public static final String JOB_ERROR_MSG_DESC = "Load job error message"; - - public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason"; - public static final String JOB_ERROR_REASON_DESC = "Load job error reason"; - - public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location"; - public static final String JOB_ERROR_LOCATION_DESC = "Load job error location"; + public static final String CREATE_DISPOSITION_DESC = "Sets whether the job is allowed to create new tables"; public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout"; public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out"; + public static final String CSV_ALLOW_JAGGED_ROWS_ATTR = "bq.csv.allow.jagged.rows"; + public static final String CSV_ALLOW_JAGGED_ROWS_DESC = "Set whether BigQuery should accept rows that are missing " + + "trailing optional columns. If true, BigQuery treats missing trailing columns as null values. If false, " + + "records with missing trailing columns are treated as bad records, and if there are too many bad records, " + + "an invalid error is returned in the job result. By default, rows with missing trailing columns are " + + "considered bad records."; + + public static final String CSV_ALLOW_QUOTED_NEW_LINES_ATTR = "bq.csv.allow.quoted.new.lines"; + public static final String CSV_ALLOW_QUOTED_NEW_LINES_DESC = "Sets whether BigQuery should allow quoted data sections " + + "that contain newline characters in a CSV file. By default quoted newline are not allowed."; + + public static final String CSV_CHARSET_ATTR = "bq.csv.charset"; + public static final String CSV_CHARSET_DESC = "Sets the character encoding of the data."; + + public static final String CSV_FIELD_DELIMITER_ATTR = "bq.csv.delimiter"; + public static final String CSV_FIELD_DELIMITER_DESC = "Sets the separator for fields in a CSV file. BigQuery converts " + + "the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the data in its " + + "raw, binary state. BigQuery also supports the escape sequence \"\t\" to specify a tab separator. The default " + + "value is a comma (',')."; + + public static final String CSV_QUOTE_ATTR = "bq.csv.quote"; + public static final String CSV_QUOTE_DESC = "Sets the value that is used to quote data sections in a CSV file. BigQuery " + + "converts the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the " + + "data in its raw, binary state. The default value is a double-quote ('\"'). If your data does not contain quoted " + + "sections, set the property value to an empty string. If your data contains quoted newline characters, you must " + + "also set the Allow Quoted New Lines property to true."; + + public static final String CSV_SKIP_LEADING_ROWS_ATTR = "bq.csv.skip.leading.rows"; + public static final String CSV_SKIP_LEADING_ROWS_DESC = "Sets the number of rows at the top of a CSV file that BigQuery " + + "will skip when reading the data. The default value is 0. This property is useful if you have header rows in the " + + "file that should be skipped."; + + // Batch Attributes - public static final String SOURCE_TYPE_ATTR = "bq.load.type"; - public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded"; - - public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown"; - public static final String IGNORE_UNKNOWN_DESC = "Ignore fields not in table schema"; - - public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition"; - public static final String WRITE_DISPOSITION_DESC = "Options for writing to table"; - - public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords"; - public static final String MAX_BADRECORDS_DESC = "Number of erroneous records to ignore before generating an error"; - public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time"; public static final String JOB_CREATE_TIME_DESC = "Time load job creation"; @@ -77,4 +111,31 @@ public class BigQueryAttributes { public static final String JOB_LINK_ATTR = "bq.job.link"; public static final String JOB_LINK_DESC = "API Link to load job"; + + public static final String JOB_NB_RECORDS_ATTR = "bq.records.count"; + public static final String JOB_NB_RECORDS_DESC = "Number of records successfully inserted"; + + public static final String JOB_ERROR_MSG_ATTR = "bq.error.message"; + public static final String JOB_ERROR_MSG_DESC = "Load job error message"; + + public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason"; + public static final String JOB_ERROR_REASON_DESC = "Load job error reason"; + + public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location"; + public static final String JOB_ERROR_LOCATION_DESC = "Load job error location"; + + + // Allowable values + public static final AllowableValue CREATE_IF_NEEDED = new AllowableValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), + JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), "Configures the job to create the table if it does not exist."); + public static final AllowableValue CREATE_NEVER = new AllowableValue(JobInfo.CreateDisposition.CREATE_NEVER.name(), + JobInfo.CreateDisposition.CREATE_NEVER.name(), "Configures the job to fail with a not-found error if the table does not exist."); + + public static final AllowableValue WRITE_EMPTY = new AllowableValue(JobInfo.WriteDisposition.WRITE_EMPTY.name(), + JobInfo.WriteDisposition.WRITE_EMPTY.name(), "Configures the job to fail with a duplicate error if the table already exists."); + public static final AllowableValue WRITE_APPEND = new AllowableValue(JobInfo.WriteDisposition.WRITE_APPEND.name(), + JobInfo.WriteDisposition.WRITE_APPEND.name(), "Configures the job to append data to the table if it already exists."); + public static final AllowableValue WRITE_TRUNCATE = new AllowableValue(JobInfo.WriteDisposition.WRITE_TRUNCATE.name(), + JobInfo.WriteDisposition.WRITE_TRUNCATE.name(), "Configures the job to overwrite the table data if table already exists."); + } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java similarity index 94% rename from nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java rename to nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java index f7f5d66586..3139ffd907 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java @@ -17,24 +17,24 @@ package org.apache.nifi.processors.gcp.bigquery; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.cloud.bigquery.Schema; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; -import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - /** - * + * Util class for schema manipulation */ -public class BqUtils { - private final static Type gsonSchemaType = new TypeToken>() { - }.getType(); +public class BigQueryUtils { + + private final static Type gsonSchemaType = new TypeToken>() { }.getType(); public static Field mapToField(Map fMap) { String typeStr = fMap.get("type").toString(); @@ -77,7 +77,7 @@ public class BqUtils { } else { Gson gson = new Gson(); List fields = gson.fromJson(schemaStr, gsonSchemaType); - return Schema.of(BqUtils.listToFields(fields)); + return Schema.of(BigQueryUtils.listToFields(fields)); } } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java index 99c7f2a159..5068ab5eb1 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java @@ -18,15 +18,16 @@ package org.apache.nifi.processors.gcp.bigquery; import com.google.cloud.RetryOption; -import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.FormatOptions; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobStatistics.LoadStatistics; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.TableDataWriteChannel; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.WriteChannelConfiguration; import com.google.common.collect.ImmutableList; + import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; @@ -35,7 +36,10 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.LogLevel; import org.apache.nifi.processor.ProcessContext; @@ -51,6 +55,7 @@ import org.threeten.bp.temporal.ChronoUnit; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,40 +64,58 @@ import java.util.concurrent.TimeUnit; /** * A processor for batch loading data into a Google BigQuery table */ - @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @Tags({"google", "google cloud", "bq", "bigquery"}) -@CapabilityDescription("Batch loads flow files to a Google BigQuery table.") +@CapabilityDescription("Batch loads flow files content to a Google BigQuery table.") @SeeAlso({PutGCSObject.class, DeleteGCSObject.class}) - @WritesAttributes({ - @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC), - @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC), - @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC), - @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC), - @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC), - @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC), - @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC), - @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC) + @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC), + @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC), + @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC), + @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC), + @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC), + @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC), + @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC) }) - public class PutBigQueryBatch extends AbstractBigQueryProcessor { + private static final List TYPES = Arrays.asList(FormatOptions.json().getType(), FormatOptions.csv().getType(), FormatOptions.avro().getType()); + + private static final Validator FORMAT_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final ValidationResult.Builder builder = new ValidationResult.Builder(); + builder.subject(subject).input(input); + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { + return builder.valid(true).explanation("Contains Expression Language").build(); + } + + if(TYPES.contains(input.toUpperCase())) { + builder.valid(true); + } else { + builder.valid(false).explanation("Load File Type must be one of the following options: " + StringUtils.join(TYPES, ", ")); + } + + return builder.build(); + } + }; + public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR) .displayName("Load file type") .description(BigQueryAttributes.SOURCE_TYPE_DESC) .required(true) - .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType()) - .defaultValue(FormatOptions.avro().getType()) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(FORMAT_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder() @@ -102,7 +125,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .required(true) .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .allowableValues("true", "false") - .defaultValue("true") + .defaultValue("false") .build(); public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder() @@ -110,8 +133,8 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .displayName("Create Disposition") .description(BigQueryAttributes.CREATE_DISPOSITION_DESC) .required(true) - .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name()) - .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name()) + .allowableValues(BigQueryAttributes.CREATE_IF_NEEDED, BigQueryAttributes.CREATE_NEVER) + .defaultValue(BigQueryAttributes.CREATE_IF_NEEDED.getValue()) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -120,8 +143,8 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .displayName("Write Disposition") .description(BigQueryAttributes.WRITE_DISPOSITION_DESC) .required(true) - .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name()) - .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name()) + .allowableValues(BigQueryAttributes.WRITE_EMPTY, BigQueryAttributes.WRITE_APPEND, BigQueryAttributes.WRITE_TRUNCATE) + .defaultValue(BigQueryAttributes.WRITE_EMPTY.getValue()) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -134,34 +157,78 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .build(); - private Schema schemaCache = null; + public static final PropertyDescriptor CSV_ALLOW_JAGGED_ROWS = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_ATTR) + .displayName("CSV Input - Allow Jagged Rows") + .description(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_DESC) + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); - public PutBigQueryBatch() { + public static final PropertyDescriptor CSV_ALLOW_QUOTED_NEW_LINES = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_ATTR) + .displayName("CSV Input - Allow Quoted New Lines") + .description(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_DESC) + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); - } + public static final PropertyDescriptor CSV_CHARSET = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.CSV_CHARSET_ATTR) + .displayName("CSV Input - Character Set") + .description(BigQueryAttributes.CSV_CHARSET_DESC) + .required(true) + .allowableValues("UTF-8", "ISO-8859-1") + .defaultValue("UTF-8") + .build(); + + public static final PropertyDescriptor CSV_FIELD_DELIMITER = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.CSV_FIELD_DELIMITER_ATTR) + .displayName("CSV Input - Field Delimiter") + .description(BigQueryAttributes.CSV_FIELD_DELIMITER_DESC) + .required(true) + .defaultValue(",") + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor CSV_QUOTE = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.CSV_QUOTE_ATTR) + .displayName("CSV Input - Quote") + .description(BigQueryAttributes.CSV_QUOTE_DESC) + .required(true) + .defaultValue("\"") + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor CSV_SKIP_LEADING_ROWS = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_ATTR) + .displayName("CSV Input - Skip Leading Rows") + .description(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_DESC) + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); @Override public List getSupportedPropertyDescriptors() { return ImmutableList.builder() .addAll(super.getSupportedPropertyDescriptors()) - .add(DATASET) - .add(TABLE_NAME) - .add(TABLE_SCHEMA) .add(SOURCE_TYPE) .add(CREATE_DISPOSITION) .add(WRITE_DISPOSITION) .add(MAXBAD_RECORDS) .add(IGNORE_UNKNOWN) - .build(); - } - - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .dynamic(true) + .add(CSV_ALLOW_JAGGED_ROWS) + .add(CSV_ALLOW_QUOTED_NEW_LINES) + .add(CSV_CHARSET) + .add(CSV_FIELD_DELIMITER) + .add(CSV_QUOTE) + .add(CSV_SKIP_LEADING_ROWS) .build(); } @@ -178,13 +245,10 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { return; } - final Map attributes = new HashMap<>(); - - final BigQuery bq = getCloudService(); - final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue(); final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue(); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String type = context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue(); final TableId tableId; if (StringUtils.isEmpty(projectId)) { @@ -193,70 +257,93 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { tableId = TableId.of(projectId, dataset, tableName); } - final String fileType = context.getProperty(SOURCE_TYPE).getValue(); - - String schemaString = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue(); - Schema schema = BqUtils.schemaFromString(schemaString); - - WriteChannelConfiguration writeChannelConfiguration = - WriteChannelConfiguration.newBuilder(tableId) - .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue())) - .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue())) - .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean()) - .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger()) - .setSchema(schema) - .setFormatOptions(FormatOptions.of(fileType)) - .build(); - - TableDataWriteChannel writer = bq.writer(writeChannelConfiguration); - try { - session.read(flowFile, rawIn -> { - ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn); - ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE); - while (readableByteChannel.read(byteBuffer) >= 0) { - byteBuffer.flip(); - writer.write(byteBuffer); - byteBuffer.clear(); - } - }); - writer.close(); + FormatOptions formatOption; - Job job = writer.getJob(); - PropertyValue property = context.getProperty(READ_TIMEOUT); - Long timePeriod = property.evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS); - Duration duration = Duration.of(timePeriod, ChronoUnit.SECONDS); - job = job.waitFor(RetryOption.totalTimeout(duration)); + if(type.equals(FormatOptions.csv().getType())) { + formatOption = FormatOptions.csv().toBuilder() + .setAllowJaggedRows(context.getProperty(CSV_ALLOW_JAGGED_ROWS).asBoolean()) + .setAllowQuotedNewLines(context.getProperty(CSV_ALLOW_QUOTED_NEW_LINES).asBoolean()) + .setEncoding(context.getProperty(CSV_CHARSET).getValue()) + .setFieldDelimiter(context.getProperty(CSV_FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue()) + .setQuote(context.getProperty(CSV_QUOTE).evaluateAttributeExpressions(flowFile).getValue()) + .setSkipLeadingRows(context.getProperty(CSV_SKIP_LEADING_ROWS).evaluateAttributeExpressions(flowFile).asInteger()) + .build(); + } else { + formatOption = FormatOptions.of(type); + } - if (job != null) { - attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime())); - attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime())); - attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime())); - attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink()); + final Schema schema = BigQueryUtils.schemaFromString(context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions(flowFile).getValue()); + final WriteChannelConfiguration writeChannelConfiguration = + WriteChannelConfiguration.newBuilder(tableId) + .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue())) + .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue())) + .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean()) + .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger()) + .setSchema(schema) + .setFormatOptions(formatOption) + .build(); - boolean jobError = (job.getStatus().getError() != null); + try ( TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration) ) { - if (jobError) { - attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage()); - attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason()); - attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation()); - } else { - // in case it got looped back from error - flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR); - flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR); - flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR); - } + session.read(flowFile, rawIn -> { + ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn); + ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE); + while (readableByteChannel.read(byteBuffer) >= 0) { + byteBuffer.flip(); + writer.write(byteBuffer); + byteBuffer.clear(); + } + }); - if (!attributes.isEmpty()) { - flowFile = session.putAllAttributes(flowFile, attributes); - } + // writer must be closed to get the job + writer.close(); - if (jobError) { - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } else { - session.transfer(flowFile, REL_SUCCESS); + Job job = writer.getJob(); + Long timePeriod = context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.SECONDS); + Duration waitFor = Duration.of(timePeriod, ChronoUnit.SECONDS); + job = job.waitFor(RetryOption.totalTimeout(waitFor)); + + if (job != null) { + final Map attributes = new HashMap<>(); + + attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime())); + attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime())); + attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime())); + attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink()); + + boolean jobError = (job.getStatus().getError() != null); + + if (jobError) { + attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage()); + attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason()); + attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation()); + } else { + // in case it got looped back from error + flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR); + flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR); + flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR); + + // add the number of records successfully added + if(job.getStatistics() instanceof LoadStatistics) { + final LoadStatistics stats = (LoadStatistics) job.getStatistics(); + attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(stats.getOutputRows())); + } + } + + if (!attributes.isEmpty()) { + flowFile = session.putAllAttributes(flowFile, attributes); + } + + if (jobError) { + getLogger().log(LogLevel.WARN, job.getStatus().getError().getMessage()); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } else { + session.getProvenanceReporter().send(flowFile, job.getSelfLink(), job.getStatistics().getEndTime() - job.getStatistics().getStartTime()); + session.transfer(flowFile, REL_SUCCESS); + } } } @@ -266,4 +353,5 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { session.transfer(flowFile, REL_FAILURE); } } -} + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java index 31dc0a7fd6..8930a270bd 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java @@ -18,13 +18,17 @@ package org.apache.nifi.processors.gcp.pubsub; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.ServiceOptions; + import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.gcp.AbstractGCPProcessor; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -62,4 +66,21 @@ public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor { protected ServiceOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) { return null; } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final Collection results = super.customValidate(validationContext); + + final boolean projectId = validationContext.getProperty(PROJECT_ID).isSet(); + if (!projectId) { + results.add(new ValidationResult.Builder() + .subject(PROJECT_ID.getName()) + .valid(false) + .explanation("The Project ID must be set for this processor.") + .build()); + } + + return results; + } + } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java index 7c636312e1..fba984ef38 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.processors.gcp.storage; -import java.net.Proxy; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -36,7 +34,6 @@ import org.apache.nifi.proxy.ProxyConfiguration; import com.google.api.gax.retrying.RetrySettings; import com.google.auth.oauth2.GoogleCredentials; -import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import com.google.common.collect.ImmutableList; @@ -73,7 +70,7 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor customValidate(ValidationContext validationContext) { - final Collection results = new ArrayList<>(); + final Collection results = super.customValidate(validationContext); ProxyConfiguration.validateProxySpec(validationContext, results, ProxyAwareTransportFactory.PROXY_SPECS); customValidate(validationContext, results); return results; @@ -96,30 +93,10 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor { - final String proxyHost = context.getProperty(PROXY_HOST).getValue(); - final Integer proxyPort = context.getProperty(PROXY_PORT).asInteger(); - if (proxyHost != null && proxyPort != null && proxyPort > 0) { - final ProxyConfiguration componentProxyConfig = new ProxyConfiguration(); - final String proxyUser = context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue(); - final String proxyPassword = context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue(); - componentProxyConfig.setProxyType(Proxy.Type.HTTP); - componentProxyConfig.setProxyServerHost(proxyHost); - componentProxyConfig.setProxyServerPort(proxyPort); - componentProxyConfig.setProxyUserName(proxyUser); - componentProxyConfig.setProxyUserPassword(proxyPassword); - return componentProxyConfig; - } - return ProxyConfiguration.DIRECT_CONFIGURATION; - }); - if (!projectId.isEmpty()) { storageOptionsBuilder.setProjectId(projectId); } - final ProxyAwareTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration); - storageOptionsBuilder.setTransportOptions(HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build()); - - return storageOptionsBuilder.build(); + return storageOptionsBuilder.setTransportOptions(getTransportOptions(context)).build(); } } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml index 658350ee21..98aa1ee205 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml @@ -33,7 +33,7 @@ com.google.auth google-auth-library-oauth2-http - 0.9.0 + 0.12.0 com.google.code.findbugs diff --git a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml index bb459a88cb..c1300a433d 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml @@ -27,14 +27,15 @@ pom - 0.47.0-alpha + 0.71.0-alpha + com.google.cloud - google-cloud + google-cloud-bom ${google.cloud.sdk.version} pom import