From 7485687d4bd71bf8637cffce7e86c4bc90857cf7 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Tue, 6 Jun 2023 14:44:27 -0500 Subject: [PATCH] NIFI-11657 Removed Deprecated PutBigQueryBatch and PutBigQueryStreaming Signed-off-by: Pierre Villard This closes #7351. --- .../gcp/bigquery/BigQueryAttributes.java | 98 ----- .../gcp/bigquery/BigQueryUtils.java | 83 ---- .../processors/gcp/bigquery/PutBigQuery.java | 2 - .../gcp/bigquery/PutBigQueryBatch.java | 364 ------------------ .../gcp/bigquery/PutBigQueryStreaming.java | 217 ----------- .../org.apache.nifi.processor.Processor | 2 - .../gcp/bigquery/PutBigQueryBatchTest.java | 158 -------- 7 files changed, 924 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java delete mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java delete mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java delete mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java index babfb54ee6..f26f6cefe5 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java @@ -17,12 +17,6 @@ package org.apache.nifi.processors.gcp.bigquery; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors; - -import com.google.cloud.bigquery.JobInfo; - /** * Attributes associated with the BigQuery processors */ @@ -30,79 +24,19 @@ public class BigQueryAttributes { private BigQueryAttributes() { } - public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE; - // Properties - public static final String SOURCE_TYPE_ATTR = "bq.load.type"; - public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded. Possible values: AVRO, " - + "NEWLINE_DELIMITED_JSON, CSV."; - public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown"; public static final String IGNORE_UNKNOWN_DESC = "Sets whether BigQuery should allow extra values that are not represented " + "in the table schema. If true, the extra values are ignored. If false, records with extra columns are treated as " + "bad records, and if there are too many bad records, an invalid error is returned in the job result. By default " + "unknown values are not allowed."; - public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition"; - public static final String WRITE_DISPOSITION_DESC = "Sets the action that should occur if the destination table already exists."; - - public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords"; - public static final String MAX_BADRECORDS_DESC = "Sets the maximum number of bad records that BigQuery can ignore when running " - + "the job. If the number of bad records exceeds this value, an invalid error is returned in the job result. By default " - + "no bad record is ignored."; - public static final String DATASET_ATTR = "bq.dataset"; public static final String DATASET_DESC = "BigQuery dataset name (Note - The dataset must exist in GCP)"; public static final String TABLE_NAME_ATTR = "bq.table.name"; public static final String TABLE_NAME_DESC = "BigQuery table name"; - public static final String TABLE_SCHEMA_ATTR = "bq.table.schema"; - public static final String TABLE_SCHEMA_DESC = "BigQuery schema in JSON format"; - - public static final String CREATE_DISPOSITION_ATTR = "bq.load.create_disposition"; - public static final String CREATE_DISPOSITION_DESC = "Sets whether the job is allowed to create new tables"; - - public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout"; - public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out"; - - public static final String CSV_ALLOW_JAGGED_ROWS_ATTR = "bq.csv.allow.jagged.rows"; - public static final String CSV_ALLOW_JAGGED_ROWS_DESC = "Set whether BigQuery should accept rows that are missing " - + "trailing optional columns. If true, BigQuery treats missing trailing columns as null values. If false, " - + "records with missing trailing columns are treated as bad records, and if there are too many bad records, " - + "an invalid error is returned in the job result. By default, rows with missing trailing columns are " - + "considered bad records."; - - public static final String CSV_ALLOW_QUOTED_NEW_LINES_ATTR = "bq.csv.allow.quoted.new.lines"; - public static final String CSV_ALLOW_QUOTED_NEW_LINES_DESC = "Sets whether BigQuery should allow quoted data sections " - + "that contain newline characters in a CSV file. By default quoted newline are not allowed."; - - public static final String CSV_CHARSET_ATTR = "bq.csv.charset"; - public static final String CSV_CHARSET_DESC = "Sets the character encoding of the data."; - - public static final String CSV_FIELD_DELIMITER_ATTR = "bq.csv.delimiter"; - public static final String CSV_FIELD_DELIMITER_DESC = "Sets the separator for fields in a CSV file. BigQuery converts " - + "the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the data in its " - + "raw, binary state. BigQuery also supports the escape sequence \"\t\" to specify a tab separator. The default " - + "value is a comma (',')."; - - public static final String CSV_QUOTE_ATTR = "bq.csv.quote"; - public static final String CSV_QUOTE_DESC = "Sets the value that is used to quote data sections in a CSV file. BigQuery " - + "converts the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the " - + "data in its raw, binary state. The default value is a double-quote ('\"'). If your data does not contain quoted " - + "sections, set the property value to an empty string. If your data contains quoted newline characters, you must " - + "also set the Allow Quoted New Lines property to true."; - - public static final String CSV_SKIP_LEADING_ROWS_ATTR = "bq.csv.skip.leading.rows"; - public static final String CSV_SKIP_LEADING_ROWS_DESC = "Sets the number of rows at the top of a CSV file that BigQuery " - + "will skip when reading the data. The default value is 0. This property is useful if you have header rows in the " - + "file that should be skipped."; - - public static final String AVRO_USE_LOGICAL_TYPES_ATTR = "bq.avro.use.logical.types"; - public static final String AVRO_USE_LOGICAL_TYPES_DESC = "If format is set to Avro and if this option is set to true, you " - + "can interpret logical types into their corresponding types (such as TIMESTAMP) instead of only using their raw " - + "types (such as INTEGER)."; - public static final String RECORD_READER_ATTR = "bq.record.reader"; public static final String RECORD_READER_DESC = "Specifies the Controller Service to use for parsing incoming data."; @@ -111,44 +45,12 @@ public class BigQueryAttributes { + "rows exist. If not set the entire insert request will fail if it contains an invalid row."; // Batch Attributes - public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time"; - public static final String JOB_CREATE_TIME_DESC = "Time load job creation"; - - public static final String JOB_END_TIME_ATTR = "bq.job.stat.end_time"; - public static final String JOB_END_TIME_DESC = "Time load job ended"; - - public static final String JOB_START_TIME_ATTR = "bq.job.stat.start_time"; - public static final String JOB_START_TIME_DESC = "Time load job started"; - - public static final String JOB_LINK_ATTR = "bq.job.link"; - public static final String JOB_LINK_DESC = "API Link to load job"; - - public static final String JOB_ID_ATTR = "bq.job.id"; - public static final String JOB_ID_DESC = "ID of the BigQuery job"; - public static final String JOB_NB_RECORDS_ATTR = "bq.records.count"; public static final String JOB_NB_RECORDS_DESC = "Number of records successfully inserted"; public static final String JOB_ERROR_MSG_ATTR = "bq.error.message"; - public static final String JOB_ERROR_MSG_DESC = "Load job error message"; public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason"; - public static final String JOB_ERROR_REASON_DESC = "Load job error reason"; public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location"; - public static final String JOB_ERROR_LOCATION_DESC = "Load job error location"; - - // Allowable values - public static final AllowableValue CREATE_IF_NEEDED = new AllowableValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), - JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), "Configures the job to create the table if it does not exist."); - public static final AllowableValue CREATE_NEVER = new AllowableValue(JobInfo.CreateDisposition.CREATE_NEVER.name(), - JobInfo.CreateDisposition.CREATE_NEVER.name(), "Configures the job to fail with a not-found error if the table does not exist."); - - public static final AllowableValue WRITE_EMPTY = new AllowableValue(JobInfo.WriteDisposition.WRITE_EMPTY.name(), - JobInfo.WriteDisposition.WRITE_EMPTY.name(), "Configures the job to fail with a duplicate error if the table already exists."); - public static final AllowableValue WRITE_APPEND = new AllowableValue(JobInfo.WriteDisposition.WRITE_APPEND.name(), - JobInfo.WriteDisposition.WRITE_APPEND.name(), "Configures the job to append data to the table if it already exists."); - public static final AllowableValue WRITE_TRUNCATE = new AllowableValue(JobInfo.WriteDisposition.WRITE_TRUNCATE.name(), - JobInfo.WriteDisposition.WRITE_TRUNCATE.name(), "Configures the job to overwrite the table data if table already exists."); - } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java deleted file mode 100644 index 1b621e60db..0000000000 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors.gcp.bigquery; - -import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import com.google.cloud.bigquery.Field; -import com.google.cloud.bigquery.LegacySQLTypeName; -import com.google.cloud.bigquery.Schema; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; - -/** - * Util class for schema manipulation - */ -public class BigQueryUtils { - - private final static Type gsonSchemaType = new TypeToken>() { }.getType(); - - public static Field mapToField(Map fMap) { - String typeStr = fMap.get("type").toString(); - String nameStr = fMap.get("name").toString(); - String modeStr = fMap.get("mode").toString(); - LegacySQLTypeName type = null; - - if (typeStr.equals("BOOLEAN")) { - type = LegacySQLTypeName.BOOLEAN; - } else if (typeStr.equals("STRING")) { - type = LegacySQLTypeName.STRING; - } else if (typeStr.equals("BYTES")) { - type = LegacySQLTypeName.BYTES; - } else if (typeStr.equals("INTEGER")) { - type = LegacySQLTypeName.INTEGER; - } else if (typeStr.equals("FLOAT")) { - type = LegacySQLTypeName.FLOAT; - } else if (typeStr.equals("TIMESTAMP") || typeStr.equals("DATE") - || typeStr.equals("TIME") || typeStr.equals("DATETIME")) { - type = LegacySQLTypeName.TIMESTAMP; - } else if (typeStr.equals("RECORD")) { - type = LegacySQLTypeName.RECORD; - } - - return Field.newBuilder(nameStr, type).setMode(Field.Mode.valueOf(modeStr)).build(); - } - - public static List listToFields(List m_fields) { - List fields = new ArrayList(m_fields.size()); - for (Map m : m_fields) { - fields.add(mapToField(m)); - } - - return fields; - } - - public static Schema schemaFromString(String schemaStr) { - if (schemaStr == null) { - return null; - } else { - Gson gson = new Gson(); - List fields = gson.fromJson(schemaStr, gsonSchemaType); - return Schema.of(BigQueryUtils.listToFields(fields)); - } - } - -} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java index 54105d9592..c013dd81f0 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java @@ -52,7 +52,6 @@ import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; @@ -93,7 +92,6 @@ import java.util.concurrent.atomic.AtomicReference; "The processor is record based so the used schema is driven by the RecordReader. Attributes that are not matched to the target schema" + "are skipped. Exactly once delivery semantics are achieved via stream offsets. The Storage Write API is more efficient than the older " + "insertAll method because it uses gRPC streaming rather than REST over HTTP") -@SeeAlso({PutBigQueryBatch.class, PutBigQueryStreaming.class}) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @WritesAttributes({ @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC) diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java deleted file mode 100644 index 6b229fe914..0000000000 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java +++ /dev/null @@ -1,364 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors.gcp.bigquery; - -import com.google.cloud.RetryOption; -import com.google.cloud.bigquery.FormatOptions; -import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.JobStatistics.LoadStatistics; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.TableDataWriteChannel; -import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.WriteChannelConfiguration; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.DeprecationNotice; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.LogLevel; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.gcp.storage.DeleteGCSObject; -import org.apache.nifi.processors.gcp.storage.PutGCSObject; -import org.apache.nifi.util.StringUtils; -import org.threeten.bp.Duration; -import org.threeten.bp.temporal.ChronoUnit; - -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -/** - * A processor for batch loading data into a Google BigQuery table - * @deprecated use {@link PutBigQuery} instead which uses the Write API - */ -@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -@DeprecationNotice(alternatives = {PutBigQuery.class}, reason = "This processor is deprecated and may be removed in future releases.") -@Tags({ "google", "google cloud", "bq", "bigquery" }) -@CapabilityDescription("Please be aware this processor is deprecated and may be removed in the near future. Use PutBigQuery instead. Batch loads flow files content to a Google BigQuery table.") -@SeeAlso({ PutGCSObject.class, DeleteGCSObject.class }) -@WritesAttributes({ - @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_ID_ATTR, description = BigQueryAttributes.JOB_ID_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC) -}) -@Deprecated -public class PutBigQueryBatch extends AbstractBigQueryProcessor { - - private static final List TYPES = Arrays.asList(FormatOptions.json().getType(), FormatOptions.csv().getType(), FormatOptions.avro().getType()); - - private static final Validator FORMAT_VALIDATOR = new Validator() { - @Override - public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - final ValidationResult.Builder builder = new ValidationResult.Builder(); - builder.subject(subject).input(input); - if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { - return builder.valid(true).explanation("Contains Expression Language").build(); - } - - if (TYPES.contains(input.toUpperCase())) { - builder.valid(true); - } else { - builder.valid(false).explanation("Load File Type must be one of the following options: " + StringUtils.join(TYPES, ", ")); - } - - return builder.build(); - } - }; - - public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder() - .name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR) - .displayName("Read Timeout") - .description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC) - .required(true) - .defaultValue("5 minutes") - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - - public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor.Builder() - .name(BigQueryAttributes.TABLE_SCHEMA_ATTR) - .displayName("Table Schema") - .description(BigQueryAttributes.TABLE_SCHEMA_DESC) - .required(false) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR) - .displayName("Load file type") - .description(BigQueryAttributes.SOURCE_TYPE_DESC) - .required(true) - .addValidator(FORMAT_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .build(); - - public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder() - .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR) - .displayName("Create Disposition") - .description(BigQueryAttributes.CREATE_DISPOSITION_DESC) - .required(true) - .allowableValues(BigQueryAttributes.CREATE_IF_NEEDED, BigQueryAttributes.CREATE_NEVER) - .defaultValue(BigQueryAttributes.CREATE_IF_NEEDED.getValue()) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder() - .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR) - .displayName("Write Disposition") - .description(BigQueryAttributes.WRITE_DISPOSITION_DESC) - .required(true) - .allowableValues(BigQueryAttributes.WRITE_EMPTY, BigQueryAttributes.WRITE_APPEND, BigQueryAttributes.WRITE_TRUNCATE) - .defaultValue(BigQueryAttributes.WRITE_EMPTY.getValue()) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder() - .name(BigQueryAttributes.MAX_BADRECORDS_ATTR) - .displayName("Max Bad Records") - .description(BigQueryAttributes.MAX_BADRECORDS_DESC) - .required(true) - .defaultValue("0") - .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .build(); - - public static final PropertyDescriptor CSV_ALLOW_JAGGED_ROWS = new PropertyDescriptor.Builder() - .name(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_ATTR) - .displayName("CSV Input - Allow Jagged Rows") - .description(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_DESC) - .required(true) - .allowableValues("true", "false") - .defaultValue("false") - .build(); - - public static final PropertyDescriptor CSV_ALLOW_QUOTED_NEW_LINES = new PropertyDescriptor.Builder() - .name(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_ATTR) - .displayName("CSV Input - Allow Quoted New Lines") - .description(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_DESC) - .required(true) - .allowableValues("true", "false") - .defaultValue("false") - .build(); - - public static final PropertyDescriptor CSV_CHARSET = new PropertyDescriptor.Builder() - .name(BigQueryAttributes.CSV_CHARSET_ATTR) - .displayName("CSV Input - Character Set") - .description(BigQueryAttributes.CSV_CHARSET_DESC) - .required(true) - .allowableValues("UTF-8", "ISO-8859-1") - .defaultValue("UTF-8") - .build(); - - public static final PropertyDescriptor CSV_FIELD_DELIMITER = new PropertyDescriptor.Builder() - .name(BigQueryAttributes.CSV_FIELD_DELIMITER_ATTR) - .displayName("CSV Input - Field Delimiter") - .description(BigQueryAttributes.CSV_FIELD_DELIMITER_DESC) - .required(true) - .defaultValue(",") - .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .build(); - - public static final PropertyDescriptor CSV_QUOTE = new PropertyDescriptor.Builder() - .name(BigQueryAttributes.CSV_QUOTE_ATTR) - .displayName("CSV Input - Quote") - .description(BigQueryAttributes.CSV_QUOTE_DESC) - .required(true) - .defaultValue("\"") - .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .build(); - - public static final PropertyDescriptor CSV_SKIP_LEADING_ROWS = new PropertyDescriptor.Builder() - .name(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_ATTR) - .displayName("CSV Input - Skip Leading Rows") - .description(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_DESC) - .required(true) - .defaultValue("0") - .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .build(); - - public static final PropertyDescriptor AVRO_USE_LOGICAL_TYPES = new PropertyDescriptor.Builder() - .name(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_ATTR) - .displayName("Avro Input - Use Logical Types") - .description(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_DESC) - .required(true) - .allowableValues("true", "false") - .defaultValue("false") - .build(); - - @Override - public List getSupportedPropertyDescriptors() { - final List descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors()); - descriptors.add(TABLE_SCHEMA); - descriptors.add(READ_TIMEOUT); - descriptors.add(SOURCE_TYPE); - descriptors.add(CREATE_DISPOSITION); - descriptors.add(WRITE_DISPOSITION); - descriptors.add(MAXBAD_RECORDS); - descriptors.add(CSV_ALLOW_JAGGED_ROWS); - descriptors.add(CSV_ALLOW_QUOTED_NEW_LINES); - descriptors.add(CSV_CHARSET); - descriptors.add(CSV_FIELD_DELIMITER); - descriptors.add(CSV_QUOTE); - descriptors.add(CSV_SKIP_LEADING_ROWS); - descriptors.add(AVRO_USE_LOGICAL_TYPES); - return Collections.unmodifiableList(descriptors); - } - - @Override - @OnScheduled - public void onScheduled(ProcessContext context) { - super.onScheduled(context); - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final String type = context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue(); - final TableId tableId = getTableId(context, flowFile.getAttributes()); - - try { - - FormatOptions formatOption; - - if (type.equals(FormatOptions.csv().getType())) { - formatOption = FormatOptions.csv().toBuilder() - .setAllowJaggedRows(context.getProperty(CSV_ALLOW_JAGGED_ROWS).asBoolean()) - .setAllowQuotedNewLines(context.getProperty(CSV_ALLOW_QUOTED_NEW_LINES).asBoolean()) - .setEncoding(context.getProperty(CSV_CHARSET).getValue()) - .setFieldDelimiter(context.getProperty(CSV_FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue()) - .setQuote(context.getProperty(CSV_QUOTE).evaluateAttributeExpressions(flowFile).getValue()) - .setSkipLeadingRows(context.getProperty(CSV_SKIP_LEADING_ROWS).evaluateAttributeExpressions(flowFile).asInteger()) - .build(); - } else { - formatOption = FormatOptions.of(type); - } - - final Schema schema = BigQueryUtils.schemaFromString(context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions(flowFile).getValue()); - final WriteChannelConfiguration writeChannelConfiguration = WriteChannelConfiguration.newBuilder(tableId) - .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue())) - .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue())) - .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean()) - .setUseAvroLogicalTypes(context.getProperty(AVRO_USE_LOGICAL_TYPES).asBoolean()) - .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger()) - .setSchema(schema) - .setFormatOptions(formatOption) - .build(); - - try (TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration)) { - - session.read(flowFile, rawIn -> { - ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn); - ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE); - while (readableByteChannel.read(byteBuffer) >= 0) { - byteBuffer.flip(); - writer.write(byteBuffer); - byteBuffer.clear(); - } - }); - - // writer must be closed to get the job - writer.close(); - - Job job = writer.getJob(); - Long timePeriod = context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.SECONDS); - Duration waitFor = Duration.of(timePeriod, ChronoUnit.SECONDS); - job = job.waitFor(RetryOption.totalTimeout(waitFor)); - - if (job != null) { - final Map attributes = new HashMap<>(); - - attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime())); - attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime())); - attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime())); - attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink()); - attributes.put(BigQueryAttributes.JOB_ID_ATTR, job.getJobId().getJob()); - - boolean jobError = (job.getStatus().getError() != null); - - if (jobError) { - attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage()); - attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason()); - attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation()); - } else { - // in case it got looped back from error - flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR); - flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR); - flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR); - - // add the number of records successfully added - if (job.getStatistics() instanceof LoadStatistics) { - final LoadStatistics stats = (LoadStatistics) job.getStatistics(); - attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(stats.getOutputRows())); - } - } - - if (!attributes.isEmpty()) { - flowFile = session.putAllAttributes(flowFile, attributes); - } - - if (jobError) { - getLogger().log(LogLevel.WARN, job.getStatus().getError().getMessage()); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } else { - session.getProvenanceReporter().send(flowFile, job.getSelfLink(), job.getStatistics().getEndTime() - job.getStatistics().getStartTime()); - session.transfer(flowFile, REL_SUCCESS); - } - } - } - - } catch (Exception ex) { - getLogger().log(LogLevel.ERROR, ex.getMessage(), ex); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } - } - -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java deleted file mode 100644 index 7e00cc2359..0000000000 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors.gcp.bigquery; - -import com.google.cloud.bigquery.BigQueryError; -import com.google.cloud.bigquery.InsertAllRequest; -import com.google.cloud.bigquery.InsertAllResponse; -import com.google.cloud.bigquery.TableId; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.SystemResource; -import org.apache.nifi.annotation.behavior.SystemResourceConsideration; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.DeprecationNotice; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.LogLevel; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.serialization.RecordReader; -import org.apache.nifi.serialization.RecordReaderFactory; -import org.apache.nifi.serialization.record.MapRecord; -import org.apache.nifi.serialization.record.Record; - -import java.io.InputStream; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * A processor for streaming loading data into a Google BigQuery table. It uses the BigQuery - * streaming insert API to insert data. This provides the lowest-latency insert path into BigQuery, - * and therefore is the default method when the input is unbounded. BigQuery will make a strong - * effort to ensure no duplicates when using this path, however there are some scenarios in which - * BigQuery is unable to make this guarantee (see - * https://cloud.google.com/bigquery/streaming-data-into-bigquery). A query can be run over the - * output table to periodically clean these rare duplicates. Alternatively, using the Batch insert - * method does guarantee no duplicates, though the latency for the insert into BigQuery will be much - * higher. - * - * @deprecated use {@link PutBigQuery} instead which uses the Write API - */ -@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -@DeprecationNotice(alternatives = {PutBigQuery.class}, reason = "This processor is deprecated and may be removed in future releases.") -@Tags({ "google", "google cloud", "bq", "gcp", "bigquery", "record" }) -@CapabilityDescription("Please be aware this processor is deprecated and may be removed in the near future. Use PutBigQuery instead. " - + "Load data into Google BigQuery table using the streaming API. This processor " - + "is not intended to load large flow files as it will load the full content into memory. If " - + "you need to insert large flow files, consider using PutBigQueryBatch instead.") -@SeeAlso({ PutBigQueryBatch.class }) -@SystemResourceConsideration(resource = SystemResource.MEMORY) -@WritesAttributes({ - @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC) -}) -@Deprecated -public class PutBigQueryStreaming extends AbstractBigQueryProcessor { - - private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); - private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS"); - - public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() - .name(BigQueryAttributes.RECORD_READER_ATTR) - .displayName("Record Reader") - .description(BigQueryAttributes.RECORD_READER_DESC) - .identifiesControllerService(RecordReaderFactory.class) - .required(true) - .build(); - - public static final PropertyDescriptor SKIP_INVALID_ROWS = new PropertyDescriptor.Builder() - .name(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR) - .displayName("Skip Invalid Rows") - .description(BigQueryAttributes.SKIP_INVALID_ROWS_DESC) - .required(true) - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .defaultValue("false") - .build(); - - @Override - public List getSupportedPropertyDescriptors() { - final List descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors()); - descriptors.add(RECORD_READER); - descriptors.add(SKIP_INVALID_ROWS); - return Collections.unmodifiableList(descriptors); - } - - @Override - @OnScheduled - public void onScheduled(ProcessContext context) { - super.onScheduled(context); - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); - final TableId tableId = getTableId(context, flowFile.getAttributes()); - - try { - - InsertAllRequest.Builder request = InsertAllRequest.newBuilder(tableId); - int nbrecord = 0; - - try (final InputStream in = session.read(flowFile)) { - final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); - try (final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) { - Record currentRecord; - while ((currentRecord = reader.nextRecord()) != null) { - request.addRow(convertMapRecord(currentRecord.toMap())); - nbrecord++; - } - } - } - - request.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean()); - request.setSkipInvalidRows(context.getProperty(SKIP_INVALID_ROWS).evaluateAttributeExpressions(flowFile).asBoolean()); - - InsertAllResponse response = getCloudService().insertAll(request.build()); - - final Map attributes = new HashMap<>(); - - if (response.hasErrors()) { - getLogger().log(LogLevel.WARN, "Failed to insert {} of {} records into BigQuery {} table.", new Object[] { response.getInsertErrors().size(), nbrecord, tableName }); - if (getLogger().isDebugEnabled()) { - for (long index : response.getInsertErrors().keySet()) { - for (BigQueryError e : response.getInsertErrors().get(index)) { - getLogger().log(LogLevel.DEBUG, "Failed to insert record #{}: {}", new Object[] { index, e.getMessage() }); - } - } - } - - attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(nbrecord - response.getInsertErrors().size())); - - flowFile = session.penalize(flowFile); - flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, REL_FAILURE); - } else { - attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(nbrecord)); - flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, REL_SUCCESS); - } - - } catch (Exception ex) { - getLogger().log(LogLevel.ERROR, ex.getMessage(), ex); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } - } - - private Map convertMapRecord(Map map) { - Map result = new HashMap(); - for (String key : map.keySet()) { - Object obj = map.get(key); - if (obj instanceof MapRecord) { - result.put(key, convertMapRecord(((MapRecord) obj).toMap())); - } else if (obj instanceof Object[] - && ((Object[]) obj).length > 0 - && ((Object[]) obj)[0] instanceof MapRecord) { - List> lmapr = new ArrayList>(); - for (Object mapr : ((Object[]) obj)) { - lmapr.add(convertMapRecord(((MapRecord) mapr).toMap())); - } - result.put(key, lmapr); - } else if (obj instanceof Timestamp) { - // ZoneOffset.UTC time zone is necessary due to implicit time zone conversion in Record Readers from - // the local system time zone to the GMT time zone - LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Timestamp) obj).getTime()), ZoneOffset.UTC); - result.put(key, dateTime.format(timestampFormatter)); - } else if (obj instanceof Time) { - // ZoneOffset.UTC time zone is necessary due to implicit time zone conversion in Record Readers from - // the local system time zone to the GMT time zone - LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Time) obj).getTime()), ZoneOffset.UTC); - result.put(key, dateTime.format(timeFormatter)); - } else if (obj instanceof Date) { - result.put(key, obj.toString()); - } else { - result.put(key, obj); - } - } - return result; - } - -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 1fe8dd6535..fbcb0d907a 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -21,8 +21,6 @@ org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite org.apache.nifi.processors.gcp.bigquery.PutBigQuery -org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch -org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming org.apache.nifi.processors.gcp.drive.ListGoogleDrive org.apache.nifi.processors.gcp.drive.FetchGoogleDrive org.apache.nifi.processors.gcp.drive.PutGoogleDrive diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java deleted file mode 100644 index 4ad8ae3eb6..0000000000 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.gcp.bigquery; - -import com.google.cloud.RetryOption; -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryException; -import com.google.cloud.bigquery.FormatOptions; -import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.JobId; -import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.JobStatistics; -import com.google.cloud.bigquery.JobStatus; -import com.google.cloud.bigquery.TableDataWriteChannel; -import com.google.cloud.bigquery.WriteChannelConfiguration; -import org.apache.nifi.components.ConfigVerificationResult; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.VerifiableProcessor; -import org.apache.nifi.util.TestRunner; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentMatchers; -import org.mockito.Mock; - -import java.util.Collections; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; - -/** - * Unit tests for {@link PutBigQueryBatch}. - */ -public class PutBigQueryBatchTest extends AbstractBQTest { - private static final String TABLE_NAME = "test_table"; - private static final String TABLE_SCHEMA = "[{ \"mode\": \"NULLABLE\", \"name\": \"data\", \"type\": \"STRING\" }]"; - private static final String SOURCE_TYPE = FormatOptions.json().getType(); - private static final String CREATE_DISPOSITION = JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(); - private static final String WRITE_DISPOSITION = JobInfo.WriteDisposition.WRITE_EMPTY.name(); - private static final String MAX_BAD_RECORDS = "0"; - private static final String IGNORE_UNKNOWN = "true"; - private static final String READ_TIMEOUT = "5 minutes"; - - @Mock - Job job; - - @Mock - JobId jobId; - - @Mock - JobStatus jobStatus; - - @Mock - JobStatistics stats; - - @Mock - TableDataWriteChannel tableDataWriteChannel; - - @Override - public AbstractBigQueryProcessor getProcessor() { - return new PutBigQueryBatch() { - @Override - protected BigQuery getCloudService() { - return bq; - } - - @Override - protected BigQuery getCloudService(final ProcessContext context) { - return bq; - } - }; - } - - @Override - protected void addRequiredPropertiesToRunner(TestRunner runner) { - runner.setProperty(PutBigQueryBatch.DATASET, DATASET); - runner.setProperty(PutBigQueryBatch.TABLE_NAME, TABLE_NAME); - runner.setProperty(PutBigQueryBatch.TABLE_SCHEMA, TABLE_SCHEMA); - runner.setProperty(PutBigQueryBatch.SOURCE_TYPE, SOURCE_TYPE); - runner.setProperty(PutBigQueryBatch.CREATE_DISPOSITION, CREATE_DISPOSITION); - runner.setProperty(PutBigQueryBatch.WRITE_DISPOSITION, WRITE_DISPOSITION); - runner.setProperty(PutBigQueryBatch.MAXBAD_RECORDS, MAX_BAD_RECORDS); - runner.setProperty(PutBigQueryBatch.IGNORE_UNKNOWN, IGNORE_UNKNOWN); - runner.setProperty(PutBigQueryBatch.READ_TIMEOUT, READ_TIMEOUT); - } - - @Test - public void testSuccessfulLoad() throws Exception { - when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel); - when(tableDataWriteChannel.getJob()).thenReturn(job); - when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenReturn(job); - when(job.getStatus()).thenReturn(jobStatus); - when(job.getStatistics()).thenReturn(stats); - - when(stats.getCreationTime()).thenReturn(0L); - when(stats.getStartTime()).thenReturn(1L); - when(stats.getEndTime()).thenReturn(2L); - when(job.getJobId()).thenReturn(jobId); - when(jobId.getJob()).thenReturn("job-id"); - - final AbstractBigQueryProcessor processor = getProcessor(); - final TestRunner runner = buildNewRunner(processor); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - - runner.enqueue("{ \"data\": \"datavalue\" }"); - - runner.run(); - - when(bq.testIamPermissions(any(), any())).thenReturn(Collections.singletonList("permission")); - final List verificationResults = ((VerifiableProcessor) processor).verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()); - assertEquals(2, verificationResults.size()); - assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, verificationResults.get(1).getOutcome()); - - runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_SUCCESS); - } - - @Test - public void testFailedLoad() throws Exception { - when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel); - when(tableDataWriteChannel.getJob()).thenReturn(job); - when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenThrow(BigQueryException.class); - - final TestRunner runner = buildNewRunner(getProcessor()); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - - runner.enqueue("{ \"data\": \"datavalue\" }"); - - runner.run(); - - runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_FAILURE); - } - - @Test - public void testMandatoryProjectId() throws Exception { - final TestRunner runner = buildNewRunner(getProcessor()); - addRequiredPropertiesToRunner(runner); - runner.assertValid(); - - runner.removeProperty(PutBigQueryBatch.PROJECT_ID); - runner.assertNotValid(); - } -} \ No newline at end of file