NIFI-6159 - Add BigQuery processor using the Streaming API

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #3394.
This commit is contained in:
Pierre Villard 2019-03-27 16:52:47 +01:00
parent a0a66839c4
commit b12a9ad446
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
12 changed files with 628 additions and 135 deletions

View File

@ -38,6 +38,18 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
@ -88,6 +100,12 @@
<version>1.10.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
<version>1.10.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
@ -100,6 +118,8 @@
<exclude>src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker</exclude>
<exclude>src/test/resources/mock-gcp-service-account.json</exclude>
<exclude>src/test/resources/mock-gcp-application-default-credentials.json</exclude>
<exclude>src/test/resources/bigquery/streaming-bad-data.json</exclude>
<exclude>src/test/resources/bigquery/streaming-correct-data.json</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -17,11 +17,13 @@
package org.apache.nifi.processors.gcp.bigquery;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@ -35,13 +37,11 @@ import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.common.collect.ImmutableList;
/**
* Base class for creating processors that connect to GCP BiqQuery service
@ -50,14 +50,12 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
static final int BUFFER_SIZE = 65536;
public static final Relationship REL_SUCCESS =
new Relationship.Builder().name("success")
.description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
.build();
public static final Relationship REL_FAILURE =
new Relationship.Builder().name("failure")
.description("FlowFiles are routed to this relationship if the Google BigQuery operation fails.")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("FlowFiles are routed to this relationship if the Google BigQuery operation fails.")
.build();
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
@ -82,23 +80,14 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
.displayName("Table Schema")
.description(BigQueryAttributes.TABLE_SCHEMA_DESC)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
.displayName("Read Timeout")
.description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
.displayName("Ignore Unknown Values")
.description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
.required(true)
.defaultValue("5 minutes")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("false")
.build();
@Override
@ -108,12 +97,11 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.<PropertyDescriptor>builder()
return ImmutableList.<PropertyDescriptor> builder()
.addAll(super.getSupportedPropertyDescriptors())
.add(DATASET)
.add(TABLE_NAME)
.add(TABLE_SCHEMA)
.add(READ_TIMEOUT)
.add(IGNORE_UNKNOWN)
.build();
}
@ -153,7 +141,8 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
}
/**
* If sub-classes needs to implement any custom validation, override this method then add validation result to the results.
* If sub-classes needs to implement any custom validation, override this method then add
* validation result to the results.
*/
protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
}

View File

@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -27,7 +27,8 @@ import com.google.cloud.bigquery.JobInfo;
* Attributes associated with the BigQuery processors
*/
public class BigQueryAttributes {
private BigQueryAttributes() {}
private BigQueryAttributes() {
}
public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
@ -102,7 +103,12 @@ public class BigQueryAttributes {
+ "can interpret logical types into their corresponding types (such as TIMESTAMP) instead of only using their raw "
+ "types (such as INTEGER).";
public static final String RECORD_READER_ATTR = "bq.record.reader";
public static final String RECORD_READER_DESC = "Specifies the Controller Service to use for parsing incoming data.";
public static final String SKIP_INVALID_ROWS_ATTR = "bq.skip.invalid.rows";
public static final String SKIP_INVALID_ROWS_DESC = "Sets whether to insert all valid rows of a request, even if invalid "
+ "rows exist. If not set the entire insert request will fail if it contains an invalid row.";
// Batch Attributes
public static final String JOB_CREATE_TIME_ATTR = "bq.job.stat.creation_time";
@ -129,7 +135,6 @@ public class BigQueryAttributes {
public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
public static final String JOB_ERROR_LOCATION_DESC = "Load job error location";
// Allowable values
public static final AllowableValue CREATE_IF_NEEDED = new AllowableValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(),
JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), "Configures the job to create the table if it does not exist.");

View File

@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -17,16 +17,14 @@
package org.apache.nifi.processors.gcp.bigquery;
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.collect.ImmutableList;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -52,39 +50,33 @@ import org.apache.nifi.util.StringUtils;
import org.threeten.bp.Duration;
import org.threeten.bp.temporal.ChronoUnit;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.collect.ImmutableList;
/**
* A processor for batch loading data into a Google BigQuery table
*/
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"google", "google cloud", "bq", "bigquery"})
@Tags({ "google", "google cloud", "bq", "bigquery" })
@CapabilityDescription("Batch loads flow files content to a Google BigQuery table.")
@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
@SeeAlso({ PutGCSObject.class, DeleteGCSObject.class })
@WritesAttributes({
@WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC),
@WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC),
@WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC),
@WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
@WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
@WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
@WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
@WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
})
public class PutBigQueryBatch extends AbstractBigQueryProcessor {
@ -99,7 +91,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
return builder.valid(true).explanation("Contains Expression Language").build();
}
if(TYPES.contains(input.toUpperCase())) {
if (TYPES.contains(input.toUpperCase())) {
builder.valid(true);
} else {
builder.valid(false).explanation("Load File Type must be one of the following options: " + StringUtils.join(TYPES, ", "));
@ -109,8 +101,26 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
}
};
public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
.Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
.displayName("Read Timeout")
.description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
.required(true)
.defaultValue("5 minutes")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
.displayName("Table Schema")
.description(BigQueryAttributes.TABLE_SCHEMA_DESC)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
.displayName("Load file type")
.description(BigQueryAttributes.SOURCE_TYPE_DESC)
.required(true)
@ -118,16 +128,6 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
.displayName("Ignore Unknown Values")
.description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
.displayName("Create Disposition")
@ -225,13 +225,14 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.<PropertyDescriptor>builder()
return ImmutableList.<PropertyDescriptor> builder()
.addAll(super.getSupportedPropertyDescriptors())
.add(TABLE_SCHEMA)
.add(READ_TIMEOUT)
.add(SOURCE_TYPE)
.add(CREATE_DISPOSITION)
.add(WRITE_DISPOSITION)
.add(MAXBAD_RECORDS)
.add(IGNORE_UNKNOWN)
.add(CSV_ALLOW_JAGGED_ROWS)
.add(CSV_ALLOW_QUOTED_NEW_LINES)
.add(CSV_CHARSET)
@ -271,7 +272,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
FormatOptions formatOption;
if(type.equals(FormatOptions.csv().getType())) {
if (type.equals(FormatOptions.csv().getType())) {
formatOption = FormatOptions.csv().toBuilder()
.setAllowJaggedRows(context.getProperty(CSV_ALLOW_JAGGED_ROWS).asBoolean())
.setAllowQuotedNewLines(context.getProperty(CSV_ALLOW_QUOTED_NEW_LINES).asBoolean())
@ -285,18 +286,17 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
}
final Schema schema = BigQueryUtils.schemaFromString(context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions(flowFile).getValue());
final WriteChannelConfiguration writeChannelConfiguration =
WriteChannelConfiguration.newBuilder(tableId)
final WriteChannelConfiguration writeChannelConfiguration = WriteChannelConfiguration.newBuilder(tableId)
.setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
.setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean())
.setUseAvroLogicalTypes(context.getProperty(AVRO_USE_LOGICAL_TYPES).asBoolean())
.setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
.setSchema(schema)
.setFormatOptions(formatOption)
.build();
try ( TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration) ) {
try (TableDataWriteChannel writer = getCloudService().writer(writeChannelConfiguration)) {
session.read(flowFile, rawIn -> {
ReadableByteChannel readableByteChannel = Channels.newChannel(rawIn);
@ -337,7 +337,7 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
flowFile = session.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
// add the number of records successfully added
if(job.getStatistics() instanceof LoadStatistics) {
if (job.getStatistics() instanceof LoadStatistics) {
final LoadStatistics stats = (LoadStatistics) job.getStatistics();
attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(stats.getOutputRows()));
}

View File

@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StringUtils;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableList;
/**
* A processor for streaming loading data into a Google BigQuery table. It uses the BigQuery
* streaming insert API to insert data. This provides the lowest-latency insert path into BigQuery,
* and therefore is the default method when the input is unbounded. BigQuery will make a strong
* effort to ensure no duplicates when using this path, however there are some scenarios in which
* BigQuery is unable to make this guarantee (see
* https://cloud.google.com/bigquery/streaming-data-into-bigquery). A query can be run over the
* output table to periodically clean these rare duplicates. Alternatively, using the Batch insert
* method does guarantee no duplicates, though the latency for the insert into BigQuery will be much
* higher.
*/
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({ "google", "google cloud", "bq", "gcp", "bigquery", "record" })
@CapabilityDescription("Load data into Google BigQuery table using the streaming API. This processor "
+ "is not intended to load large flow files as it will load the full content into memory. If "
+ "you need to insert large flow files, consider using PutBigQueryBatch instead.")
@SeeAlso({ PutBigQueryBatch.class })
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@WritesAttributes({
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
})
public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.RECORD_READER_ATTR)
.displayName("Record Reader")
.description(BigQueryAttributes.RECORD_READER_DESC)
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
public static final PropertyDescriptor SKIP_INVALID_ROWS = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR)
.displayName("Skip Invalid Rows")
.description(BigQueryAttributes.SKIP_INVALID_ROWS_DESC)
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("false")
.build();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.<PropertyDescriptor> builder()
.addAll(super.getSupportedPropertyDescriptors())
.add(RECORD_READER)
.add(SKIP_INVALID_ROWS)
.build();
}
@Override
@OnScheduled
public void onScheduled(ProcessContext context) {
super.onScheduled(context);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final TableId tableId;
if (StringUtils.isEmpty(projectId)) {
tableId = TableId.of(dataset, tableName);
} else {
tableId = TableId.of(projectId, dataset, tableName);
}
try {
InsertAllRequest.Builder request = InsertAllRequest.newBuilder(tableId);
int nbrecord = 0;
try (final InputStream in = session.read(flowFile)) {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
try (final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());) {
Record currentRecord;
while ((currentRecord = reader.nextRecord()) != null) {
request.addRow(convertMapRecord(currentRecord.toMap()));
nbrecord++;
}
}
}
request.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean());
request.setSkipInvalidRows(context.getProperty(SKIP_INVALID_ROWS).evaluateAttributeExpressions(flowFile).asBoolean());
InsertAllResponse response = getCloudService().insertAll(request.build());
final Map<String, String> attributes = new HashMap<>();
if (response.hasErrors()) {
getLogger().log(LogLevel.WARN, "Failed to insert {} of {} records into BigQuery {} table.", new Object[] { response.getInsertErrors().size(), nbrecord, tableName });
if (getLogger().isDebugEnabled()) {
for (long index : response.getInsertErrors().keySet()) {
for (BigQueryError e : response.getInsertErrors().get(index)) {
getLogger().log(LogLevel.DEBUG, "Failed to insert record #{}: {}", new Object[] { index, e.getMessage() });
}
}
}
attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(nbrecord - response.getInsertErrors().size()));
flowFile = session.penalize(flowFile);
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_FAILURE);
} else {
attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(nbrecord));
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
}
} catch (Exception ex) {
getLogger().log(LogLevel.ERROR, ex.getMessage(), ex);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
private Map<String, Object> convertMapRecord(Map<String, Object> map) {
Map<String, Object> result = new HashMap<String, Object>();
for (String key : map.keySet()) {
Object obj = map.get(key);
if (obj instanceof MapRecord) {
result.put(key, convertMapRecord(((MapRecord) obj).toMap()));
} else if (obj instanceof Object[]
&& ((Object[]) obj).length > 0
&& ((Object[]) obj)[0] instanceof MapRecord) {
List<Map<String, Object>> lmapr = new ArrayList<Map<String, Object>>();
for (Object mapr : ((Object[]) obj)) {
lmapr.add(convertMapRecord(((MapRecord) mapr).toMap()));
}
result.put(key, lmapr);
} else {
result.put(key, obj);
}
}
return result;
}
}

View File

@ -18,4 +18,5 @@ org.apache.nifi.processors.gcp.storage.DeleteGCSObject
org.apache.nifi.processors.gcp.storage.ListGCSBucket
org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming

View File

@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -17,13 +17,18 @@
package org.apache.nifi.processors.gcp.bigquery;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processors.gcp.GCPIntegrationTests;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
import org.apache.nifi.processors.gcp.credentials.factory.CredentialsFactory;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
@ -31,24 +36,37 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertNull;
import com.google.auth.Credentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
@Category(GCPIntegrationTests.class)
public abstract class AbstractBigQueryIT {
static final String CONTROLLER_SERVICE = "GCPCredentialsService";
protected static final String CONTROLLER_SERVICE = "GCPCredentialsService";
protected static final String PROJECT_ID = System.getProperty("test.gcp.project.id", "nifi");
protected static final String SERVICE_ACCOUNT_JSON = System.getProperty("test.gcp.service.account", "/path/to/service/account.json");
protected static BigQuery bigquery;
protected static Dataset dataset;
protected static TestRunner runner;
private static final CredentialsFactory credentialsProviderFactory = new CredentialsFactory();
@BeforeClass
public static void beforeClass() {
dataset = null;
public static void beforeClass() throws IOException {
final Map<PropertyDescriptor, String> propertiesMap = new HashMap<>();
propertiesMap.put(CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE, SERVICE_ACCOUNT_JSON);
Credentials credentials = credentialsProviderFactory.getGoogleCredentials(propertiesMap, new ProxyAwareTransportFactory(null));
BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder()
.setProjectId(PROJECT_ID)
.setCredentials(credentials)
.build();
bigquery = bigQueryOptions.getService();
DatasetInfo datasetInfo = DatasetInfo.newBuilder(RemoteBigQueryHelper.generateDatasetName()).build();
@ -67,9 +85,11 @@ public abstract class AbstractBigQueryIT {
}
protected TestRunner setCredentialsControllerService(TestRunner runner) throws InitializationException {
final Map<String, String> propertiesMap = new HashMap<>();
final GCPCredentialsControllerService credentialsControllerService = new GCPCredentialsControllerService();
final Map<String, String> propertiesMap = new HashMap<>();
propertiesMap.put(CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE.getName(), SERVICE_ACCOUNT_JSON);
runner.addControllerService(CONTROLLER_SERVICE, credentialsControllerService, propertiesMap);
runner.enableControllerService(credentialsControllerService);
runner.assertValid(credentialsControllerService);

View File

@ -1,12 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -17,7 +17,13 @@
package org.apache.nifi.processors.gcp.bigquery;
import com.google.cloud.bigquery.FormatOptions;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
@ -25,12 +31,7 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import com.google.cloud.bigquery.FormatOptions;
public class PutBigQueryBatchIT extends AbstractBigQueryIT {
@ -58,6 +59,7 @@ public class PutBigQueryBatchIT extends AbstractBigQueryIT {
@Before
public void setup() {
runner = TestRunners.newTestRunner(PutBigQueryBatch.class);
runner.setProperty(AbstractBigQueryProcessor.PROJECT_ID, PROJECT_ID);
}
@Test

View File

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.nio.file.Paths;
import java.util.Iterator;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Field.Mode;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
public class PutBigQueryStreamingIT extends AbstractBigQueryIT {
private Schema schema;
@Before
public void setup() throws InitializationException {
runner = TestRunners.newTestRunner(PutBigQueryStreaming.class);
runner = setCredentialsControllerService(runner);
runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(AbstractGCPProcessor.PROJECT_ID, PROJECT_ID);
}
private void createTable(String tableName) {
TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
// Table field definition
Field id = Field.newBuilder("id", LegacySQLTypeName.INTEGER).setMode(Mode.REQUIRED).build();
Field name = Field.newBuilder("name", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field alias = Field.newBuilder("alias", LegacySQLTypeName.STRING).setMode(Mode.REPEATED).build();
Field zip = Field.newBuilder("zip", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field city = Field.newBuilder("city", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field addresses = Field.newBuilder("addresses", LegacySQLTypeName.RECORD, zip, city).setMode(Mode.REPEATED).build();
Field position = Field.newBuilder("position", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field company = Field.newBuilder("company", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field job = Field.newBuilder("job", LegacySQLTypeName.RECORD, position, company).setMode(Mode.NULLABLE).build();
// Table schema definition
schema = Schema.of(id, name, alias, addresses, job);
TableDefinition tableDefinition = StandardTableDefinition.of(schema);
TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
// create table
bigquery.create(tableInfo);
}
private void deleteTable(String tableName) {
TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
bigquery.delete(tableId);
}
@Test
public void PutBigQueryStreamingNoError() throws Exception {
String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
createTable(tableName);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
Iterator<FieldValueList> iterator = result.getValues().iterator();
FieldValueList firstElt = iterator.next();
FieldValueList sndElt = iterator.next();
assertTrue(firstElt.get("name").getStringValue().endsWith("Doe"));
assertTrue(sndElt.get("name").getStringValue().endsWith("Doe"));
FieldValueList john;
FieldValueList jane;
john = firstElt.get("name").getStringValue().equals("John Doe") ? firstElt : sndElt;
jane = firstElt.get("name").getStringValue().equals("Jane Doe") ? firstElt : sndElt;
assertEquals(jane.get("job").getRecordValue().get(0).getStringValue(), "Director");
assertTrue(john.get("alias").getRepeatedValue().size() == 2);
assertTrue(john.get("addresses").getRepeatedValue().get(0).getRecordValue().get(0).getStringValue().endsWith("000"));
deleteTable(tableName);
}
@Test
public void PutBigQueryStreamingFullError() throws Exception {
String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
createTable(tableName);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-bad-data.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_FAILURE).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "0");
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
assertFalse(result.getValues().iterator().hasNext());
deleteTable(tableName);
}
@Test
public void PutBigQueryStreamingPartialError() throws Exception {
String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
createTable(tableName);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.setProperty(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR, "true");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-bad-data.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_FAILURE).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "1");
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
Iterator<FieldValueList> iterator = result.getValues().iterator();
FieldValueList firstElt = iterator.next();
assertFalse(iterator.hasNext());
assertEquals(firstElt.get("name").getStringValue(), "Jane Doe");
deleteTable(tableName);
}
}

View File

@ -0,0 +1,35 @@
[
{
"name": "John Doe",
"alias": ["john", "jd"],
"addresses": [
{
"zip": "1000",
"city": "NiFi"
},
{
"zip": "2000",
"city": "Bar"
}
],
"job": {
"position": "Manager",
"company": "ASF"
}
},
{
"id": 2,
"name": "Jane Doe",
"alias": ["jane"],
"addresses": [
{
"zip": "1000",
"city": "NiFi"
}
],
"job": {
"position": "Director",
"company": "ASF"
}
}
]

View File

@ -0,0 +1,36 @@
[
{
"id": 1,
"name": "John Doe",
"alias": ["john", "jd"],
"addresses": [
{
"zip": "1000",
"city": "NiFi"
},
{
"zip": "2000",
"city": "Bar"
}
],
"job": {
"position": "Manager",
"company": "ASF"
}
},
{
"id": 2,
"name": "Jane Doe",
"alias": [],
"addresses": [
{
"zip": "1000",
"city": "NiFi"
}
],
"job": {
"position": "Director",
"company": "ASF"
}
}
]

View File

@ -27,7 +27,7 @@
<packaging>pom</packaging>
<properties>
<google.cloud.sdk.version>0.101.0-alpha</google.cloud.sdk.version>
<google.cloud.sdk.version>0.107.0-alpha</google.cloud.sdk.version>
</properties>
<dependencyManagement>