From b12a9ad446773f9f043dfef10327691bf3963a07 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Wed, 27 Mar 2019 16:52:47 +0100 Subject: [PATCH] NIFI-6159 - Add BigQuery processor using the Streaming API Signed-off-by: Pierre Villard This closes #3394. --- .../nifi-gcp-processors/pom.xml | 20 ++ .../bigquery/AbstractBigQueryProcessor.java | 73 +++---- .../gcp/bigquery/BigQueryAttributes.java | 15 +- .../gcp/bigquery/PutBigQueryBatch.java | 120 +++++------ .../gcp/bigquery/PutBigQueryStreaming.java | 201 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 3 +- .../gcp/bigquery/AbstractBigQueryIT.java | 52 +++-- .../gcp/bigquery/PutBigQueryBatchIT.java | 22 +- .../gcp/bigquery/PutBigQueryStreamingIT.java | 184 ++++++++++++++++ .../bigquery/streaming-bad-data.json | 35 +++ .../bigquery/streaming-correct-data.json | 36 ++++ nifi-nar-bundles/nifi-gcp-bundle/pom.xml | 2 +- 12 files changed, 628 insertions(+), 135 deletions(-) create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-bad-data.json create mode 100644 nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data.json diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml index 5d21ef3316..2455891f21 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml @@ -38,6 +38,18 @@ org.apache.nifi nifi-proxy-configuration-api + + org.apache.nifi + nifi-record-serialization-service-api + + + org.apache.nifi + nifi-record + + + org.apache.nifi + nifi-schema-registry-service-api + org.apache.nifi nifi-processor-utils @@ -88,6 +100,12 @@ 1.10.0-SNAPSHOT compile + + org.apache.nifi + nifi-record-serialization-services + 1.10.0-SNAPSHOT + test + @@ -100,6 +118,8 @@ src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker src/test/resources/mock-gcp-service-account.json src/test/resources/mock-gcp-application-default-credentials.json + src/test/resources/bigquery/streaming-bad-data.json + src/test/resources/bigquery/streaming-correct-data.json diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java index 375106051f..b2dc43a1a4 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java @@ -1,12 +1,12 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,11 +17,13 @@ package org.apache.nifi.processors.gcp.bigquery; -import com.google.api.gax.retrying.RetrySettings; -import com.google.auth.oauth2.GoogleCredentials; -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryOptions; -import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -35,13 +37,11 @@ import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory; import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.util.StringUtils; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import com.google.api.gax.retrying.RetrySettings; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.common.collect.ImmutableList; /** * Base class for creating processors that connect to GCP BiqQuery service @@ -50,14 +50,12 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor relationships = Collections.unmodifiableSet( new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); @@ -82,23 +80,14 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor getSupportedPropertyDescriptors() { - return ImmutableList.builder() + return ImmutableList. builder() .addAll(super.getSupportedPropertyDescriptors()) .add(DATASET) .add(TABLE_NAME) - .add(TABLE_SCHEMA) - .add(READ_TIMEOUT) + .add(IGNORE_UNKNOWN) .build(); } @@ -153,7 +141,8 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor results) { } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java index 81978eb2c8..4a379c08db 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java @@ -1,12 +1,12 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -27,7 +27,8 @@ import com.google.cloud.bigquery.JobInfo; * Attributes associated with the BigQuery processors */ public class BigQueryAttributes { - private BigQueryAttributes() {} + private BigQueryAttributes() { + } public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE; @@ -102,7 +103,12 @@ public class BigQueryAttributes { + "can interpret logical types into their corresponding types (such as TIMESTAMP) instead of only using their raw " + "types (such as INTEGER)."; + public static final String RECORD_READER_ATTR = "bq.record.reader"; + public static final String RECORD_READER_DESC = "Specifies the Controller Service to use for parsing incoming data."; + public static final String SKIP_INVALID_ROWS_ATTR = "bq.skip.invalid.rows"; + public static final String SKIP_INVALID_ROWS_DESC = "Sets whether to insert all valid rows of a request, even if invalid " + + "rows exist. If not set the entire insert request will fail if it contains an invalid row."; // Batch Attributes public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time"; @@ -129,7 +135,6 @@ public class BigQueryAttributes { public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location"; public static final String JOB_ERROR_LOCATION_DESC = "Load job error location"; - // Allowable values public static final AllowableValue CREATE_IF_NEEDED = new AllowableValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), "Configures the job to create the table if it does not exist."); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java index 5446c20ba7..cd3cb09427 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java @@ -1,12 +1,12 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,16 +17,14 @@ package org.apache.nifi.processors.gcp.bigquery; -import com.google.cloud.RetryOption; -import com.google.cloud.bigquery.FormatOptions; -import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.JobStatistics.LoadStatistics; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.TableDataWriteChannel; -import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.WriteChannelConfiguration; -import com.google.common.collect.ImmutableList; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -52,39 +50,33 @@ import org.apache.nifi.util.StringUtils; import org.threeten.bp.Duration; import org.threeten.bp.temporal.ChronoUnit; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; +import com.google.cloud.RetryOption; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobStatistics.LoadStatistics; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.TableDataWriteChannel; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.WriteChannelConfiguration; +import com.google.common.collect.ImmutableList; /** * A processor for batch loading data into a Google BigQuery table */ @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -@Tags({"google", "google cloud", "bq", "bigquery"}) +@Tags({ "google", "google cloud", "bq", "bigquery" }) @CapabilityDescription("Batch loads flow files content to a Google BigQuery table.") -@SeeAlso({PutGCSObject.class, DeleteGCSObject.class}) +@SeeAlso({ PutGCSObject.class, DeleteGCSObject.class }) @WritesAttributes({ - @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC), - @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC), - @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC), - @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC), - @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC), - @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC), - @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC), - @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC), - @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC) + @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC) }) public class PutBigQueryBatch extends AbstractBigQueryProcessor { @@ -99,7 +91,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { return builder.valid(true).explanation("Contains Expression Language").build(); } - if(TYPES.contains(input.toUpperCase())) { + if (TYPES.contains(input.toUpperCase())) { builder.valid(true); } else { builder.valid(false).explanation("Load File Type must be one of the following options: " + StringUtils.join(TYPES, ", ")); @@ -109,8 +101,26 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { } }; - public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor - .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR) + public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR) + .displayName("Read Timeout") + .description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC) + .required(true) + .defaultValue("5 minutes") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.TABLE_SCHEMA_ATTR) + .displayName("Table Schema") + .description(BigQueryAttributes.TABLE_SCHEMA_DESC) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR) .displayName("Load file type") .description(BigQueryAttributes.SOURCE_TYPE_DESC) .required(true) @@ -118,16 +128,6 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); - public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder() - .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR) - .displayName("Ignore Unknown Values") - .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC) - .required(true) - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .allowableValues("true", "false") - .defaultValue("false") - .build(); - public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder() .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR) .displayName("Create Disposition") @@ -225,13 +225,14 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { @Override public List getSupportedPropertyDescriptors() { - return ImmutableList.builder() + return ImmutableList. builder() .addAll(super.getSupportedPropertyDescriptors()) + .add(TABLE_SCHEMA) + .add(READ_TIMEOUT) .add(SOURCE_TYPE) .add(CREATE_DISPOSITION) .add(WRITE_DISPOSITION) .add(MAXBAD_RECORDS) - .add(IGNORE_UNKNOWN) .add(CSV_ALLOW_JAGGED_ROWS) .add(CSV_ALLOW_QUOTED_NEW_LINES) .add(CSV_CHARSET) @@ -271,7 +272,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { FormatOptions formatOption; - if(type.equals(FormatOptions.csv().getType())) { + if (type.equals(FormatOptions.csv().getType())) { formatOption = FormatOptions.csv().toBuilder() .setAllowJaggedRows(context.getProperty(CSV_ALLOW_JAGGED_ROWS).asBoolean()) .setAllowQuotedNewLines(context.getProperty(CSV_ALLOW_QUOTED_NEW_LINES).asBoolean()) @@ -285,18 +286,17 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { } final Schema schema = BigQueryUtils.schemaFromString(context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions(flowFile).getValue()); - final WriteChannelConfiguration writeChannelConfiguration = - WriteChannelConfiguration.newBuilder(tableId) + final WriteChannelConfiguration writeChannelConfiguration = WriteChannelConfiguration.newBuilder(tableId) .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue())) .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue())) - .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean()) + .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean()) .setUseAvroLogicalTypes(context.getProperty(AVRO_USE_LOGICAL_TYPES).asBoolean()) .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger()) .setSchema(schema) .setFormatOptions(formatOption) .build(); - try ( TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration) ) { + try (TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration)) { session.read(flowFile, rawIn -> { ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn); @@ -337,7 +337,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor { flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR); // add the number of records successfully added - if(job.getStatistics() instanceof LoadStatistics) { + if (job.getStatistics() instanceof LoadStatistics) { final LoadStatistics stats = (LoadStatistics) job.getStatistics(); attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(stats.getOutputRows())); } diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java new file mode 100644 index 0000000000..98457a38e6 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.gcp.bigquery; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.StringUtils; + +import com.google.cloud.bigquery.BigQueryError; +import com.google.cloud.bigquery.InsertAllRequest; +import com.google.cloud.bigquery.InsertAllResponse; +import com.google.cloud.bigquery.TableId; +import com.google.common.collect.ImmutableList; + +/** + * A processor for streaming loading data into a Google BigQuery table. It uses the BigQuery + * streaming insert API to insert data. This provides the lowest-latency insert path into BigQuery, + * and therefore is the default method when the input is unbounded. BigQuery will make a strong + * effort to ensure no duplicates when using this path, however there are some scenarios in which + * BigQuery is unable to make this guarantee (see + * https://cloud.google.com/bigquery/streaming-data-into-bigquery). A query can be run over the + * output table to periodically clean these rare duplicates. Alternatively, using the Batch insert + * method does guarantee no duplicates, though the latency for the insert into BigQuery will be much + * higher. + */ +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({ "google", "google cloud", "bq", "gcp", "bigquery", "record" }) +@CapabilityDescription("Load data into Google BigQuery table using the streaming API. This processor " + + "is not intended to load large flow files as it will load the full content into memory. If " + + "you need to insert large flow files, consider using PutBigQueryBatch instead.") +@SeeAlso({ PutBigQueryBatch.class }) +@SystemResourceConsideration(resource = SystemResource.MEMORY) +@WritesAttributes({ + @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC) +}) +public class PutBigQueryStreaming extends AbstractBigQueryProcessor { + + public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.RECORD_READER_ATTR) + .displayName("Record Reader") + .description(BigQueryAttributes.RECORD_READER_DESC) + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + public static final PropertyDescriptor SKIP_INVALID_ROWS = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR) + .displayName("Skip Invalid Rows") + .description(BigQueryAttributes.SKIP_INVALID_ROWS_DESC) + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue("false") + .build(); + + @Override + public List getSupportedPropertyDescriptors() { + return ImmutableList. builder() + .addAll(super.getSupportedPropertyDescriptors()) + .add(RECORD_READER) + .add(SKIP_INVALID_ROWS) + .build(); + } + + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + super.onScheduled(context); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue(); + final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + + final TableId tableId; + if (StringUtils.isEmpty(projectId)) { + tableId = TableId.of(dataset, tableName); + } else { + tableId = TableId.of(projectId, dataset, tableName); + } + + try { + + InsertAllRequest.Builder request = InsertAllRequest.newBuilder(tableId); + int nbrecord = 0; + + try (final InputStream in = session.read(flowFile)) { + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + try (final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());) { + Record currentRecord; + while ((currentRecord = reader.nextRecord()) != null) { + request.addRow(convertMapRecord(currentRecord.toMap())); + nbrecord++; + } + } + } + + request.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean()); + request.setSkipInvalidRows(context.getProperty(SKIP_INVALID_ROWS).evaluateAttributeExpressions(flowFile).asBoolean()); + + InsertAllResponse response = getCloudService().insertAll(request.build()); + + final Map attributes = new HashMap<>(); + + if (response.hasErrors()) { + getLogger().log(LogLevel.WARN, "Failed to insert {} of {} records into BigQuery {} table.", new Object[] { response.getInsertErrors().size(), nbrecord, tableName }); + if (getLogger().isDebugEnabled()) { + for (long index : response.getInsertErrors().keySet()) { + for (BigQueryError e : response.getInsertErrors().get(index)) { + getLogger().log(LogLevel.DEBUG, "Failed to insert record #{}: {}", new Object[] { index, e.getMessage() }); + } + } + } + + attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(nbrecord - response.getInsertErrors().size())); + + flowFile = session.penalize(flowFile); + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_FAILURE); + } else { + attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(nbrecord)); + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); + } + + } catch (Exception ex) { + getLogger().log(LogLevel.ERROR, ex.getMessage(), ex); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } + + private Map convertMapRecord(Map map) { + Map result = new HashMap(); + for (String key : map.keySet()) { + Object obj = map.get(key); + if (obj instanceof MapRecord) { + result.put(key, convertMapRecord(((MapRecord) obj).toMap())); + } else if (obj instanceof Object[] + && ((Object[]) obj).length > 0 + && ((Object[]) obj)[0] instanceof MapRecord) { + List> lmapr = new ArrayList>(); + for (Object mapr : ((Object[]) obj)) { + lmapr.add(convertMapRecord(((MapRecord) mapr).toMap())); + } + result.put(key, lmapr); + } else { + result.put(key, obj); + } + } + return result; + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 4ff9dfb2f1..9d26958e9b 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -18,4 +18,5 @@ org.apache.nifi.processors.gcp.storage.DeleteGCSObject org.apache.nifi.processors.gcp.storage.ListGCSBucket org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub -org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch \ No newline at end of file +org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch +org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java index 52327d4b24..cba730fb02 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java @@ -1,12 +1,12 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,13 +17,18 @@ package org.apache.nifi.processors.gcp.bigquery; -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryOptions; -import com.google.cloud.bigquery.Dataset; -import com.google.cloud.bigquery.DatasetInfo; -import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processors.gcp.GCPIntegrationTests; +import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory; +import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors; +import org.apache.nifi.processors.gcp.credentials.factory.CredentialsFactory; import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.TestRunner; @@ -31,24 +36,37 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertNull; +import com.google.auth.Credentials; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; @Category(GCPIntegrationTests.class) public abstract class AbstractBigQueryIT { - static final String CONTROLLER_SERVICE = "GCPCredentialsService"; + protected static final String CONTROLLER_SERVICE = "GCPCredentialsService"; + protected static final String PROJECT_ID = System.getProperty("test.gcp.project.id", "nifi"); + protected static final String SERVICE_ACCOUNT_JSON = System.getProperty("test.gcp.service.account", "/path/to/service/account.json"); + protected static BigQuery bigquery; protected static Dataset dataset; protected static TestRunner runner; + private static final CredentialsFactory credentialsProviderFactory = new CredentialsFactory(); + @BeforeClass - public static void beforeClass() { - dataset = null; + public static void beforeClass() throws IOException { + final Map propertiesMap = new HashMap<>(); + propertiesMap.put(CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE, SERVICE_ACCOUNT_JSON); + Credentials credentials = credentialsProviderFactory.getGoogleCredentials(propertiesMap, new ProxyAwareTransportFactory(null)); + BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder() + .setProjectId(PROJECT_ID) + .setCredentials(credentials) .build(); + bigquery = bigQueryOptions.getService(); DatasetInfo datasetInfo = DatasetInfo.newBuilder(RemoteBigQueryHelper.generateDatasetName()).build(); @@ -67,9 +85,11 @@ public abstract class AbstractBigQueryIT { } protected TestRunner setCredentialsControllerService(TestRunner runner) throws InitializationException { - final Map propertiesMap = new HashMap<>(); final GCPCredentialsControllerService credentialsControllerService = new GCPCredentialsControllerService(); + final Map propertiesMap = new HashMap<>(); + propertiesMap.put(CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE.getName(), SERVICE_ACCOUNT_JSON); + runner.addControllerService(CONTROLLER_SERVICE, credentialsControllerService, propertiesMap); runner.enableControllerService(credentialsControllerService); runner.assertValid(credentialsControllerService); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java index 8686213e69..bd56340a49 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java @@ -1,12 +1,12 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,7 +17,13 @@ package org.apache.nifi.processors.gcp.bigquery; -import com.google.cloud.bigquery.FormatOptions; +import java.io.BufferedWriter; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; + import org.apache.nifi.processors.gcp.AbstractGCPProcessor; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; @@ -25,12 +31,7 @@ import org.apache.nifi.util.TestRunners; import org.junit.Before; import org.junit.Test; -import java.io.BufferedWriter; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; +import com.google.cloud.bigquery.FormatOptions; public class PutBigQueryBatchIT extends AbstractBigQueryIT { @@ -58,6 +59,7 @@ public class PutBigQueryBatchIT extends AbstractBigQueryIT { @Before public void setup() { runner = TestRunners.newTestRunner(PutBigQueryBatch.class); + runner.setProperty(AbstractBigQueryProcessor.PROJECT_ID, PROJECT_ID); } @Test diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java new file mode 100644 index 0000000000..6bed8be5b5 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.gcp.bigquery; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.nio.file.Paths; +import java.util.Iterator; + +import org.apache.nifi.json.JsonTreeReader; +import org.apache.nifi.processors.gcp.AbstractGCPProcessor; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Field.Mode; +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.TableResult; + +public class PutBigQueryStreamingIT extends AbstractBigQueryIT { + + private Schema schema; + + @Before + public void setup() throws InitializationException { + runner = TestRunners.newTestRunner(PutBigQueryStreaming.class); + runner = setCredentialsControllerService(runner); + runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE); + runner.setProperty(AbstractGCPProcessor.PROJECT_ID, PROJECT_ID); + } + + private void createTable(String tableName) { + TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName); + + // Table field definition + Field id = Field.newBuilder("id", LegacySQLTypeName.INTEGER).setMode(Mode.REQUIRED).build(); + Field name = Field.newBuilder("name", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build(); + Field alias = Field.newBuilder("alias", LegacySQLTypeName.STRING).setMode(Mode.REPEATED).build(); + + Field zip = Field.newBuilder("zip", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build(); + Field city = Field.newBuilder("city", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build(); + Field addresses = Field.newBuilder("addresses", LegacySQLTypeName.RECORD, zip, city).setMode(Mode.REPEATED).build(); + + Field position = Field.newBuilder("position", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build(); + Field company = Field.newBuilder("company", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build(); + Field job = Field.newBuilder("job", LegacySQLTypeName.RECORD, position, company).setMode(Mode.NULLABLE).build(); + + // Table schema definition + schema = Schema.of(id, name, alias, addresses, job); + TableDefinition tableDefinition = StandardTableDefinition.of(schema); + TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build(); + + // create table + bigquery.create(tableInfo); + } + + private void deleteTable(String tableName) { + TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName); + bigquery.delete(tableId); + } + + @Test + public void PutBigQueryStreamingNoError() throws Exception { + String tableName = Thread.currentThread().getStackTrace()[1].getMethodName(); + createTable(tableName); + + runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset()); + runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName); + + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + runner.enableControllerService(jsonReader); + + runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader"); + + runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data.json")); + + runner.run(); + runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2"); + + TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema); + Iterator iterator = result.getValues().iterator(); + + FieldValueList firstElt = iterator.next(); + FieldValueList sndElt = iterator.next(); + assertTrue(firstElt.get("name").getStringValue().endsWith("Doe")); + assertTrue(sndElt.get("name").getStringValue().endsWith("Doe")); + + FieldValueList john; + FieldValueList jane; + john = firstElt.get("name").getStringValue().equals("John Doe") ? firstElt : sndElt; + jane = firstElt.get("name").getStringValue().equals("Jane Doe") ? firstElt : sndElt; + + assertEquals(jane.get("job").getRecordValue().get(0).getStringValue(), "Director"); + assertTrue(john.get("alias").getRepeatedValue().size() == 2); + assertTrue(john.get("addresses").getRepeatedValue().get(0).getRecordValue().get(0).getStringValue().endsWith("000")); + + deleteTable(tableName); + } + + @Test + public void PutBigQueryStreamingFullError() throws Exception { + String tableName = Thread.currentThread().getStackTrace()[1].getMethodName(); + createTable(tableName); + + runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset()); + runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName); + + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + runner.enableControllerService(jsonReader); + + runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader"); + + runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-bad-data.json")); + + runner.run(); + runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_FAILURE, 1); + runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_FAILURE).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "0"); + + TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema); + assertFalse(result.getValues().iterator().hasNext()); + + deleteTable(tableName); + } + + @Test + public void PutBigQueryStreamingPartialError() throws Exception { + String tableName = Thread.currentThread().getStackTrace()[1].getMethodName(); + createTable(tableName); + + runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset()); + runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName); + + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + runner.enableControllerService(jsonReader); + + runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader"); + runner.setProperty(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR, "true"); + + runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-bad-data.json")); + + runner.run(); + runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_FAILURE, 1); + runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_FAILURE).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "1"); + + TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema); + Iterator iterator = result.getValues().iterator(); + + FieldValueList firstElt = iterator.next(); + assertFalse(iterator.hasNext()); + assertEquals(firstElt.get("name").getStringValue(), "Jane Doe"); + + deleteTable(tableName); + } + +} diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-bad-data.json b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-bad-data.json new file mode 100644 index 0000000000..1a7ebba8bf --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-bad-data.json @@ -0,0 +1,35 @@ +[ + { + "name": "John Doe", + "alias": ["john", "jd"], + "addresses": [ + { + "zip": "1000", + "city": "NiFi" + }, + { + "zip": "2000", + "city": "Bar" + } + ], + "job": { + "position": "Manager", + "company": "ASF" + } + }, + { + "id": 2, + "name": "Jane Doe", + "alias": ["jane"], + "addresses": [ + { + "zip": "1000", + "city": "NiFi" + } + ], + "job": { + "position": "Director", + "company": "ASF" + } + } + ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data.json b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data.json new file mode 100644 index 0000000000..994037f244 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data.json @@ -0,0 +1,36 @@ +[ + { + "id": 1, + "name": "John Doe", + "alias": ["john", "jd"], + "addresses": [ + { + "zip": "1000", + "city": "NiFi" + }, + { + "zip": "2000", + "city": "Bar" + } + ], + "job": { + "position": "Manager", + "company": "ASF" + } + }, + { + "id": 2, + "name": "Jane Doe", + "alias": [], + "addresses": [ + { + "zip": "1000", + "city": "NiFi" + } + ], + "job": { + "position": "Director", + "company": "ASF" + } + } + ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml index 2e8a65aa3a..2f88a31b0c 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml @@ -27,7 +27,7 @@ pom - 0.101.0-alpha + 0.107.0-alpha