NIFI-11657 Removed Deprecated PutBigQueryBatch and PutBigQueryStreaming

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #7351.
This commit is contained in:
exceptionfactory 2023-06-06 14:44:27 -05:00 committed by Pierre Villard
parent 1f1b2f1f28
commit 7485687d4b
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
7 changed files with 0 additions and 924 deletions

View File

@ -17,12 +17,6 @@
package org.apache.nifi.processors.gcp.bigquery; package org.apache.nifi.processors.gcp.bigquery;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
import com.google.cloud.bigquery.JobInfo;
/** /**
* Attributes associated with the BigQuery processors * Attributes associated with the BigQuery processors
*/ */
@ -30,79 +24,19 @@ public class BigQueryAttributes {
private BigQueryAttributes() { private BigQueryAttributes() {
} }
public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
// Properties // Properties
public static final String SOURCE_TYPE_ATTR = "bq.load.type";
public static final String SOURCE_TYPE_DESC = "Data type of the file to be loaded. Possible values: AVRO, "
+ "NEWLINE_DELIMITED_JSON, CSV.";
public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown"; public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown";
public static final String IGNORE_UNKNOWN_DESC = "Sets whether BigQuery should allow extra values that are not represented " public static final String IGNORE_UNKNOWN_DESC = "Sets whether BigQuery should allow extra values that are not represented "
+ "in the table schema. If true, the extra values are ignored. If false, records with extra columns are treated as " + "in the table schema. If true, the extra values are ignored. If false, records with extra columns are treated as "
+ "bad records, and if there are too many bad records, an invalid error is returned in the job result. By default " + "bad records, and if there are too many bad records, an invalid error is returned in the job result. By default "
+ "unknown values are not allowed."; + "unknown values are not allowed.";
public static final String WRITE_DISPOSITION_ATTR = "bq.load.write_disposition";
public static final String WRITE_DISPOSITION_DESC = "Sets the action that should occur if the destination table already exists.";
public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords";
public static final String MAX_BADRECORDS_DESC = "Sets the maximum number of bad records that BigQuery can ignore when running "
+ "the job. If the number of bad records exceeds this value, an invalid error is returned in the job result. By default "
+ "no bad record is ignored.";
public static final String DATASET_ATTR = "bq.dataset"; public static final String DATASET_ATTR = "bq.dataset";
public static final String DATASET_DESC = "BigQuery dataset name (Note - The dataset must exist in GCP)"; public static final String DATASET_DESC = "BigQuery dataset name (Note - The dataset must exist in GCP)";
public static final String TABLE_NAME_ATTR = "bq.table.name"; public static final String TABLE_NAME_ATTR = "bq.table.name";
public static final String TABLE_NAME_DESC = "BigQuery table name"; public static final String TABLE_NAME_DESC = "BigQuery table name";
public static final String TABLE_SCHEMA_ATTR = "bq.table.schema";
public static final String TABLE_SCHEMA_DESC = "BigQuery schema in JSON format";
public static final String CREATE_DISPOSITION_ATTR = "bq.load.create_disposition";
public static final String CREATE_DISPOSITION_DESC = "Sets whether the job is allowed to create new tables";
public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout";
public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out";
public static final String CSV_ALLOW_JAGGED_ROWS_ATTR = "bq.csv.allow.jagged.rows";
public static final String CSV_ALLOW_JAGGED_ROWS_DESC = "Set whether BigQuery should accept rows that are missing "
+ "trailing optional columns. If true, BigQuery treats missing trailing columns as null values. If false, "
+ "records with missing trailing columns are treated as bad records, and if there are too many bad records, "
+ "an invalid error is returned in the job result. By default, rows with missing trailing columns are "
+ "considered bad records.";
public static final String CSV_ALLOW_QUOTED_NEW_LINES_ATTR = "bq.csv.allow.quoted.new.lines";
public static final String CSV_ALLOW_QUOTED_NEW_LINES_DESC = "Sets whether BigQuery should allow quoted data sections "
+ "that contain newline characters in a CSV file. By default quoted newline are not allowed.";
public static final String CSV_CHARSET_ATTR = "bq.csv.charset";
public static final String CSV_CHARSET_DESC = "Sets the character encoding of the data.";
public static final String CSV_FIELD_DELIMITER_ATTR = "bq.csv.delimiter";
public static final String CSV_FIELD_DELIMITER_DESC = "Sets the separator for fields in a CSV file. BigQuery converts "
+ "the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the data in its "
+ "raw, binary state. BigQuery also supports the escape sequence \"\t\" to specify a tab separator. The default "
+ "value is a comma (',').";
public static final String CSV_QUOTE_ATTR = "bq.csv.quote";
public static final String CSV_QUOTE_DESC = "Sets the value that is used to quote data sections in a CSV file. BigQuery "
+ "converts the string to ISO-8859-1 encoding, and then uses the first byte of the encoded string to split the "
+ "data in its raw, binary state. The default value is a double-quote ('\"'). If your data does not contain quoted "
+ "sections, set the property value to an empty string. If your data contains quoted newline characters, you must "
+ "also set the Allow Quoted New Lines property to true.";
public static final String CSV_SKIP_LEADING_ROWS_ATTR = "bq.csv.skip.leading.rows";
public static final String CSV_SKIP_LEADING_ROWS_DESC = "Sets the number of rows at the top of a CSV file that BigQuery "
+ "will skip when reading the data. The default value is 0. This property is useful if you have header rows in the "
+ "file that should be skipped.";
public static final String AVRO_USE_LOGICAL_TYPES_ATTR = "bq.avro.use.logical.types";
public static final String AVRO_USE_LOGICAL_TYPES_DESC = "If format is set to Avro and if this option is set to true, you "
+ "can interpret logical types into their corresponding types (such as TIMESTAMP) instead of only using their raw "
+ "types (such as INTEGER).";
public static final String RECORD_READER_ATTR = "bq.record.reader"; public static final String RECORD_READER_ATTR = "bq.record.reader";
public static final String RECORD_READER_DESC = "Specifies the Controller Service to use for parsing incoming data."; public static final String RECORD_READER_DESC = "Specifies the Controller Service to use for parsing incoming data.";
@ -111,44 +45,12 @@ public class BigQueryAttributes {
+ "rows exist. If not set the entire insert request will fail if it contains an invalid row."; + "rows exist. If not set the entire insert request will fail if it contains an invalid row.";
// Batch Attributes // Batch Attributes
public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time";
public static final String JOB_CREATE_TIME_DESC = "Time load job creation";
public static final String JOB_END_TIME_ATTR = "bq.job.stat.end_time";
public static final String JOB_END_TIME_DESC = "Time load job ended";
public static final String JOB_START_TIME_ATTR = "bq.job.stat.start_time";
public static final String JOB_START_TIME_DESC = "Time load job started";
public static final String JOB_LINK_ATTR = "bq.job.link";
public static final String JOB_LINK_DESC = "API Link to load job";
public static final String JOB_ID_ATTR = "bq.job.id";
public static final String JOB_ID_DESC = "ID of the BigQuery job";
public static final String JOB_NB_RECORDS_ATTR = "bq.records.count"; public static final String JOB_NB_RECORDS_ATTR = "bq.records.count";
public static final String JOB_NB_RECORDS_DESC = "Number of records successfully inserted"; public static final String JOB_NB_RECORDS_DESC = "Number of records successfully inserted";
public static final String JOB_ERROR_MSG_ATTR = "bq.error.message"; public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
public static final String JOB_ERROR_MSG_DESC = "Load job error message";
public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason"; public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
public static final String JOB_ERROR_REASON_DESC = "Load job error reason";
public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location"; public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
public static final String JOB_ERROR_LOCATION_DESC = "Load job error location";
// Allowable values
public static final AllowableValue CREATE_IF_NEEDED = new AllowableValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(),
JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), "Configures the job to create the table if it does not exist.");
public static final AllowableValue CREATE_NEVER = new AllowableValue(JobInfo.CreateDisposition.CREATE_NEVER.name(),
JobInfo.CreateDisposition.CREATE_NEVER.name(), "Configures the job to fail with a not-found error if the table does not exist.");
public static final AllowableValue WRITE_EMPTY = new AllowableValue(JobInfo.WriteDisposition.WRITE_EMPTY.name(),
JobInfo.WriteDisposition.WRITE_EMPTY.name(), "Configures the job to fail with a duplicate error if the table already exists.");
public static final AllowableValue WRITE_APPEND = new AllowableValue(JobInfo.WriteDisposition.WRITE_APPEND.name(),
JobInfo.WriteDisposition.WRITE_APPEND.name(), "Configures the job to append data to the table if it already exists.");
public static final AllowableValue WRITE_TRUNCATE = new AllowableValue(JobInfo.WriteDisposition.WRITE_TRUNCATE.name(),
JobInfo.WriteDisposition.WRITE_TRUNCATE.name(), "Configures the job to overwrite the table data if table already exists.");
} }

View File

@ -1,83 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
/**
* Util class for schema manipulation
*/
public class BigQueryUtils {
private final static Type gsonSchemaType = new TypeToken<List<Map>>() { }.getType();
public static Field mapToField(Map fMap) {
String typeStr = fMap.get("type").toString();
String nameStr = fMap.get("name").toString();
String modeStr = fMap.get("mode").toString();
LegacySQLTypeName type = null;
if (typeStr.equals("BOOLEAN")) {
type = LegacySQLTypeName.BOOLEAN;
} else if (typeStr.equals("STRING")) {
type = LegacySQLTypeName.STRING;
} else if (typeStr.equals("BYTES")) {
type = LegacySQLTypeName.BYTES;
} else if (typeStr.equals("INTEGER")) {
type = LegacySQLTypeName.INTEGER;
} else if (typeStr.equals("FLOAT")) {
type = LegacySQLTypeName.FLOAT;
} else if (typeStr.equals("TIMESTAMP") || typeStr.equals("DATE")
|| typeStr.equals("TIME") || typeStr.equals("DATETIME")) {
type = LegacySQLTypeName.TIMESTAMP;
} else if (typeStr.equals("RECORD")) {
type = LegacySQLTypeName.RECORD;
}
return Field.newBuilder(nameStr, type).setMode(Field.Mode.valueOf(modeStr)).build();
}
public static List<Field> listToFields(List<Map> m_fields) {
List<Field> fields = new ArrayList(m_fields.size());
for (Map m : m_fields) {
fields.add(mapToField(m));
}
return fields;
}
public static Schema schemaFromString(String schemaStr) {
if (schemaStr == null) {
return null;
} else {
Gson gson = new Gson();
List<Map> fields = gson.fromJson(schemaStr, gsonSchemaType);
return Schema.of(BigQueryUtils.listToFields(fields));
}
}
}

View File

@ -52,7 +52,6 @@ import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@ -93,7 +92,6 @@ import java.util.concurrent.atomic.AtomicReference;
"The processor is record based so the used schema is driven by the RecordReader. Attributes that are not matched to the target schema" + "The processor is record based so the used schema is driven by the RecordReader. Attributes that are not matched to the target schema" +
"are skipped. Exactly once delivery semantics are achieved via stream offsets. The Storage Write API is more efficient than the older " + "are skipped. Exactly once delivery semantics are achieved via stream offsets. The Storage Write API is more efficient than the older " +
"insertAll method because it uses gRPC streaming rather than REST over HTTP") "insertAll method because it uses gRPC streaming rather than REST over HTTP")
@SeeAlso({PutBigQueryBatch.class, PutBigQueryStreaming.class})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC) @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)

View File

@ -1,364 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
import org.apache.nifi.processors.gcp.storage.PutGCSObject;
import org.apache.nifi.util.StringUtils;
import org.threeten.bp.Duration;
import org.threeten.bp.temporal.ChronoUnit;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* A processor for batch loading data into a Google BigQuery table
* @deprecated use {@link PutBigQuery} instead which uses the Write API
*/
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DeprecationNotice(alternatives = {PutBigQuery.class}, reason = "This processor is deprecated and may be removed in future releases.")
@Tags({ "google", "google cloud", "bq", "bigquery" })
@CapabilityDescription("Please be aware this processor is deprecated and may be removed in the near future. Use PutBigQuery instead. Batch loads flow files content to a Google BigQuery table.")
@SeeAlso({ PutGCSObject.class, DeleteGCSObject.class })
@WritesAttributes({
@WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ID_ATTR, description = BigQueryAttributes.JOB_ID_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
})
@Deprecated
public class PutBigQueryBatch extends AbstractBigQueryProcessor {
private static final List<String> TYPES = Arrays.asList(FormatOptions.json().getType(), FormatOptions.csv().getType(), FormatOptions.avro().getType());
private static final Validator FORMAT_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final ValidationResult.Builder builder = new ValidationResult.Builder();
builder.subject(subject).input(input);
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return builder.valid(true).explanation("Contains Expression Language").build();
}
if (TYPES.contains(input.toUpperCase())) {
builder.valid(true);
} else {
builder.valid(false).explanation("Load File Type must be one of the following options: " + StringUtils.join(TYPES, ", "));
}
return builder.build();
}
};
public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
.displayName("Read Timeout")
.description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
.required(true)
.defaultValue("5 minutes")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
.displayName("Table Schema")
.description(BigQueryAttributes.TABLE_SCHEMA_DESC)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
.displayName("Load file type")
.description(BigQueryAttributes.SOURCE_TYPE_DESC)
.required(true)
.addValidator(FORMAT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
.displayName("Create Disposition")
.description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
.required(true)
.allowableValues(BigQueryAttributes.CREATE_IF_NEEDED, BigQueryAttributes.CREATE_NEVER)
.defaultValue(BigQueryAttributes.CREATE_IF_NEEDED.getValue())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
.displayName("Write Disposition")
.description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
.required(true)
.allowableValues(BigQueryAttributes.WRITE_EMPTY, BigQueryAttributes.WRITE_APPEND, BigQueryAttributes.WRITE_TRUNCATE)
.defaultValue(BigQueryAttributes.WRITE_EMPTY.getValue())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
.displayName("Max Bad Records")
.description(BigQueryAttributes.MAX_BADRECORDS_DESC)
.required(true)
.defaultValue("0")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor CSV_ALLOW_JAGGED_ROWS = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_ATTR)
.displayName("CSV Input - Allow Jagged Rows")
.description(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_DESC)
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final PropertyDescriptor CSV_ALLOW_QUOTED_NEW_LINES = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_ATTR)
.displayName("CSV Input - Allow Quoted New Lines")
.description(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_DESC)
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final PropertyDescriptor CSV_CHARSET = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CSV_CHARSET_ATTR)
.displayName("CSV Input - Character Set")
.description(BigQueryAttributes.CSV_CHARSET_DESC)
.required(true)
.allowableValues("UTF-8", "ISO-8859-1")
.defaultValue("UTF-8")
.build();
public static final PropertyDescriptor CSV_FIELD_DELIMITER = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CSV_FIELD_DELIMITER_ATTR)
.displayName("CSV Input - Field Delimiter")
.description(BigQueryAttributes.CSV_FIELD_DELIMITER_DESC)
.required(true)
.defaultValue(",")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CSV_QUOTE = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CSV_QUOTE_ATTR)
.displayName("CSV Input - Quote")
.description(BigQueryAttributes.CSV_QUOTE_DESC)
.required(true)
.defaultValue("\"")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CSV_SKIP_LEADING_ROWS = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_ATTR)
.displayName("CSV Input - Skip Leading Rows")
.description(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_DESC)
.required(true)
.defaultValue("0")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor AVRO_USE_LOGICAL_TYPES = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_ATTR)
.displayName("Avro Input - Use Logical Types")
.description(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_DESC)
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
descriptors.add(TABLE_SCHEMA);
descriptors.add(READ_TIMEOUT);
descriptors.add(SOURCE_TYPE);
descriptors.add(CREATE_DISPOSITION);
descriptors.add(WRITE_DISPOSITION);
descriptors.add(MAXBAD_RECORDS);
descriptors.add(CSV_ALLOW_JAGGED_ROWS);
descriptors.add(CSV_ALLOW_QUOTED_NEW_LINES);
descriptors.add(CSV_CHARSET);
descriptors.add(CSV_FIELD_DELIMITER);
descriptors.add(CSV_QUOTE);
descriptors.add(CSV_SKIP_LEADING_ROWS);
descriptors.add(AVRO_USE_LOGICAL_TYPES);
return Collections.unmodifiableList(descriptors);
}
@Override
@OnScheduled
public void onScheduled(ProcessContext context) {
super.onScheduled(context);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String type = context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue();
final TableId tableId = getTableId(context, flowFile.getAttributes());
try {
FormatOptions formatOption;
if (type.equals(FormatOptions.csv().getType())) {
formatOption = FormatOptions.csv().toBuilder()
.setAllowJaggedRows(context.getProperty(CSV_ALLOW_JAGGED_ROWS).asBoolean())
.setAllowQuotedNewLines(context.getProperty(CSV_ALLOW_QUOTED_NEW_LINES).asBoolean())
.setEncoding(context.getProperty(CSV_CHARSET).getValue())
.setFieldDelimiter(context.getProperty(CSV_FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue())
.setQuote(context.getProperty(CSV_QUOTE).evaluateAttributeExpressions(flowFile).getValue())
.setSkipLeadingRows(context.getProperty(CSV_SKIP_LEADING_ROWS).evaluateAttributeExpressions(flowFile).asInteger())
.build();
} else {
formatOption = FormatOptions.of(type);
}
final Schema schema = BigQueryUtils.schemaFromString(context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions(flowFile).getValue());
final WriteChannelConfiguration writeChannelConfiguration = WriteChannelConfiguration.newBuilder(tableId)
.setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
.setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean())
.setUseAvroLogicalTypes(context.getProperty(AVRO_USE_LOGICAL_TYPES).asBoolean())
.setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
.setSchema(schema)
.setFormatOptions(formatOption)
.build();
try (TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration)) {
session.read(flowFile, rawIn -> {
ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
while (readableByteChannel.read(byteBuffer) >= 0) {
byteBuffer.flip();
writer.write(byteBuffer);
byteBuffer.clear();
}
});
// writer must be closed to get the job
writer.close();
Job job = writer.getJob();
Long timePeriod = context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.SECONDS);
Duration waitFor = Duration.of(timePeriod, ChronoUnit.SECONDS);
job = job.waitFor(RetryOption.totalTimeout(waitFor));
if (job != null) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(job.getStatistics().getCreationTime()));
attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(job.getStatistics().getEndTime()));
attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(job.getStatistics().getStartTime()));
attributes.put(BigQueryAttributes.JOB_LINK_ATTR, job.getSelfLink());
attributes.put(BigQueryAttributes.JOB_ID_ATTR, job.getJobId().getJob());
boolean jobError = (job.getStatus().getError() != null);
if (jobError) {
attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, job.getStatus().getError().getMessage());
attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, job.getStatus().getError().getReason());
attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, job.getStatus().getError().getLocation());
} else {
// in case it got looped back from error
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR);
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_REASON_ATTR);
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
// add the number of records successfully added
if (job.getStatistics() instanceof LoadStatistics) {
final LoadStatistics stats = (LoadStatistics) job.getStatistics();
attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(stats.getOutputRows()));
}
}
if (!attributes.isEmpty()) {
flowFile = session.putAllAttributes(flowFile, attributes);
}
if (jobError) {
getLogger().log(LogLevel.WARN, job.getStatus().getError().getMessage());
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
} else {
session.getProvenanceReporter().send(flowFile, job.getSelfLink(), job.getStatistics().getEndTime() - job.getStatistics().getStartTime());
session.transfer(flowFile, REL_SUCCESS);
}
}
}
} catch (Exception ex) {
getLogger().log(LogLevel.ERROR, ex.getMessage(), ex);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -1,217 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import java.io.InputStream;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* A processor for streaming loading data into a Google BigQuery table. It uses the BigQuery
* streaming insert API to insert data. This provides the lowest-latency insert path into BigQuery,
* and therefore is the default method when the input is unbounded. BigQuery will make a strong
* effort to ensure no duplicates when using this path, however there are some scenarios in which
* BigQuery is unable to make this guarantee (see
* https://cloud.google.com/bigquery/streaming-data-into-bigquery). A query can be run over the
* output table to periodically clean these rare duplicates. Alternatively, using the Batch insert
* method does guarantee no duplicates, though the latency for the insert into BigQuery will be much
* higher.
*
* @deprecated use {@link PutBigQuery} instead which uses the Write API
*/
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DeprecationNotice(alternatives = {PutBigQuery.class}, reason = "This processor is deprecated and may be removed in future releases.")
@Tags({ "google", "google cloud", "bq", "gcp", "bigquery", "record" })
@CapabilityDescription("Please be aware this processor is deprecated and may be removed in the near future. Use PutBigQuery instead. "
+ "Load data into Google BigQuery table using the streaming API. This processor "
+ "is not intended to load large flow files as it will load the full content into memory. If "
+ "you need to insert large flow files, consider using PutBigQueryBatch instead.")
@SeeAlso({ PutBigQueryBatch.class })
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@WritesAttributes({
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
})
@Deprecated
public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS");
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.RECORD_READER_ATTR)
.displayName("Record Reader")
.description(BigQueryAttributes.RECORD_READER_DESC)
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
public static final PropertyDescriptor SKIP_INVALID_ROWS = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR)
.displayName("Skip Invalid Rows")
.description(BigQueryAttributes.SKIP_INVALID_ROWS_DESC)
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("false")
.build();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
descriptors.add(RECORD_READER);
descriptors.add(SKIP_INVALID_ROWS);
return Collections.unmodifiableList(descriptors);
}
@Override
@OnScheduled
public void onScheduled(ProcessContext context) {
super.onScheduled(context);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final TableId tableId = getTableId(context, flowFile.getAttributes());
try {
InsertAllRequest.Builder request = InsertAllRequest.newBuilder(tableId);
int nbrecord = 0;
try (final InputStream in = session.read(flowFile)) {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
try (final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
Record currentRecord;
while ((currentRecord = reader.nextRecord()) != null) {
request.addRow(convertMapRecord(currentRecord.toMap()));
nbrecord++;
}
}
}
request.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean());
request.setSkipInvalidRows(context.getProperty(SKIP_INVALID_ROWS).evaluateAttributeExpressions(flowFile).asBoolean());
InsertAllResponse response = getCloudService().insertAll(request.build());
final Map<String, String> attributes = new HashMap<>();
if (response.hasErrors()) {
getLogger().log(LogLevel.WARN, "Failed to insert {} of {} records into BigQuery {} table.", new Object[] { response.getInsertErrors().size(), nbrecord, tableName });
if (getLogger().isDebugEnabled()) {
for (long index : response.getInsertErrors().keySet()) {
for (BigQueryError e : response.getInsertErrors().get(index)) {
getLogger().log(LogLevel.DEBUG, "Failed to insert record #{}: {}", new Object[] { index, e.getMessage() });
}
}
}
attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(nbrecord - response.getInsertErrors().size()));
flowFile = session.penalize(flowFile);
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_FAILURE);
} else {
attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(nbrecord));
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
}
} catch (Exception ex) {
getLogger().log(LogLevel.ERROR, ex.getMessage(), ex);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
private Map<String, Object> convertMapRecord(Map<String, Object> map) {
Map<String, Object> result = new HashMap<String, Object>();
for (String key : map.keySet()) {
Object obj = map.get(key);
if (obj instanceof MapRecord) {
result.put(key, convertMapRecord(((MapRecord) obj).toMap()));
} else if (obj instanceof Object[]
&& ((Object[]) obj).length > 0
&& ((Object[]) obj)[0] instanceof MapRecord) {
List<Map<String, Object>> lmapr = new ArrayList<Map<String, Object>>();
for (Object mapr : ((Object[]) obj)) {
lmapr.add(convertMapRecord(((MapRecord) mapr).toMap()));
}
result.put(key, lmapr);
} else if (obj instanceof Timestamp) {
// ZoneOffset.UTC time zone is necessary due to implicit time zone conversion in Record Readers from
// the local system time zone to the GMT time zone
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Timestamp) obj).getTime()), ZoneOffset.UTC);
result.put(key, dateTime.format(timestampFormatter));
} else if (obj instanceof Time) {
// ZoneOffset.UTC time zone is necessary due to implicit time zone conversion in Record Readers from
// the local system time zone to the GMT time zone
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Time) obj).getTime()), ZoneOffset.UTC);
result.put(key, dateTime.format(timeFormatter));
} else if (obj instanceof Date) {
result.put(key, obj.toString());
} else {
result.put(key, obj);
}
}
return result;
}
}

View File

@ -21,8 +21,6 @@ org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite
org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite
org.apache.nifi.processors.gcp.bigquery.PutBigQuery org.apache.nifi.processors.gcp.bigquery.PutBigQuery
org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
org.apache.nifi.processors.gcp.drive.ListGoogleDrive org.apache.nifi.processors.gcp.drive.ListGoogleDrive
org.apache.nifi.processors.gcp.drive.FetchGoogleDrive org.apache.nifi.processors.gcp.drive.FetchGoogleDrive
org.apache.nifi.processors.gcp.drive.PutGoogleDrive org.apache.nifi.processors.gcp.drive.PutGoogleDrive

View File

@ -1,158 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
/**
* Unit tests for {@link PutBigQueryBatch}.
*/
public class PutBigQueryBatchTest extends AbstractBQTest {
private static final String TABLE_NAME = "test_table";
private static final String TABLE_SCHEMA = "[{ \"mode\": \"NULLABLE\", \"name\": \"data\", \"type\": \"STRING\" }]";
private static final String SOURCE_TYPE = FormatOptions.json().getType();
private static final String CREATE_DISPOSITION = JobInfo.CreateDisposition.CREATE_IF_NEEDED.name();
private static final String WRITE_DISPOSITION = JobInfo.WriteDisposition.WRITE_EMPTY.name();
private static final String MAX_BAD_RECORDS = "0";
private static final String IGNORE_UNKNOWN = "true";
private static final String READ_TIMEOUT = "5 minutes";
@Mock
Job job;
@Mock
JobId jobId;
@Mock
JobStatus jobStatus;
@Mock
JobStatistics stats;
@Mock
TableDataWriteChannel tableDataWriteChannel;
@Override
public AbstractBigQueryProcessor getProcessor() {
return new PutBigQueryBatch() {
@Override
protected BigQuery getCloudService() {
return bq;
}
@Override
protected BigQuery getCloudService(final ProcessContext context) {
return bq;
}
};
}
@Override
protected void addRequiredPropertiesToRunner(TestRunner runner) {
runner.setProperty(PutBigQueryBatch.DATASET, DATASET);
runner.setProperty(PutBigQueryBatch.TABLE_NAME, TABLE_NAME);
runner.setProperty(PutBigQueryBatch.TABLE_SCHEMA, TABLE_SCHEMA);
runner.setProperty(PutBigQueryBatch.SOURCE_TYPE, SOURCE_TYPE);
runner.setProperty(PutBigQueryBatch.CREATE_DISPOSITION, CREATE_DISPOSITION);
runner.setProperty(PutBigQueryBatch.WRITE_DISPOSITION, WRITE_DISPOSITION);
runner.setProperty(PutBigQueryBatch.MAXBAD_RECORDS, MAX_BAD_RECORDS);
runner.setProperty(PutBigQueryBatch.IGNORE_UNKNOWN, IGNORE_UNKNOWN);
runner.setProperty(PutBigQueryBatch.READ_TIMEOUT, READ_TIMEOUT);
}
@Test
public void testSuccessfulLoad() throws Exception {
when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
when(tableDataWriteChannel.getJob()).thenReturn(job);
when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenReturn(job);
when(job.getStatus()).thenReturn(jobStatus);
when(job.getStatistics()).thenReturn(stats);
when(stats.getCreationTime()).thenReturn(0L);
when(stats.getStartTime()).thenReturn(1L);
when(stats.getEndTime()).thenReturn(2L);
when(job.getJobId()).thenReturn(jobId);
when(jobId.getJob()).thenReturn("job-id");
final AbstractBigQueryProcessor processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
runner.enqueue("{ \"data\": \"datavalue\" }");
runner.run();
when(bq.testIamPermissions(any(), any())).thenReturn(Collections.singletonList("permission"));
final List<ConfigVerificationResult> verificationResults = ((VerifiableProcessor) processor).verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
assertEquals(2, verificationResults.size());
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, verificationResults.get(1).getOutcome());
runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_SUCCESS);
}
@Test
public void testFailedLoad() throws Exception {
when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
when(tableDataWriteChannel.getJob()).thenReturn(job);
when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenThrow(BigQueryException.class);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
runner.assertValid();
runner.enqueue("{ \"data\": \"datavalue\" }");
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_FAILURE);
}
@Test
public void testMandatoryProjectId() throws Exception {
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);
runner.assertValid();
runner.removeProperty(PutBigQueryBatch.PROJECT_ID);
runner.assertNotValid();
}
}