NIFI-10403 Add processor supporting the new BigQuery Write API

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #6344
This commit is contained in:
Csaba Bejan 2022-08-29 21:13:05 +02:00 committed by Arpad Boda
parent 111c7ac0a4
commit 35fb66f50f
No known key found for this signature in database
GPG Key ID: 065668F2A58F097F
16 changed files with 1439 additions and 509 deletions

View File

@ -47,7 +47,7 @@ public interface Record {
boolean isDropUnknownFields();
/**
* Updates the Record's schema to to incorporate all of the fields in the given schema. If both schemas have a
* Updates the Record's schema to incorporate all of the fields in the given schema. If both schemas have a
* field with the same name but a different type, then the existing schema will be updated to have a
* {@link RecordFieldType#CHOICE} field with both types as choices. If two fields have the same name but different
* default values, then the default value that is already in place will remain the default value, unless the current

View File

@ -94,6 +94,20 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquerystorage</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>

View File

@ -28,7 +28,6 @@ import com.google.cloud.bigquery.Schema;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
/**
* Util class for schema manipulation
*/

View File

@ -0,0 +1,451 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.BQTableSchemaToProtoDescriptor;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.CivilTimeEncoder;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
import com.google.cloud.bigquery.storage.v1.StorageError;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.grpc.Status;
import java.time.LocalTime;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.bigquery.proto.ProtoUtils;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@TriggerSerially
@EventDriven
@Tags({"google", "google cloud", "bq", "bigquery"})
@CapabilityDescription("Unified processor for batch and stream flow files content to a Google BigQuery table via the Storage Write API." +
"The processor is record based so the used schema is driven by the RecordReader. Attributes that are not matched to the target schema" +
"are skipped. Exactly once delivery semantics are achieved via stream offsets. The Storage Write API is more efficient than the older " +
"insertAll method because it uses gRPC streaming rather than REST over HTTP")
@SeeAlso({PutBigQueryBatch.class, PutBigQueryStreaming.class})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttributes({
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
})
public class PutBigQuery extends AbstractBigQueryProcessor {
static final String STREAM = "STREAM";
static final String BATCH = "BATCH";
static final AllowableValue STREAM_TYPE = new AllowableValue(STREAM, STREAM, "Use streaming record handling strategy");
static final AllowableValue BATCH_TYPE = new AllowableValue(BATCH, BATCH, "Use batching record handling strategy");
private static final String APPEND_RECORD_COUNT_NAME = "bq.append.record.count";
private static final String APPEND_RECORD_COUNT_DESC = "The number of records to be appended to the write stream at once. Applicable for both batch and stream types";
private static final String TRANSFER_TYPE_NAME = "bq.transfer.type";
private static final String TRANSFER_TYPE_DESC = "Defines the preferred transfer type streaming or batching";
private static final List<Status.Code> RETRYABLE_ERROR_CODES = Arrays.asList(Status.Code.INTERNAL, Status.Code.ABORTED, Status.Code.CANCELLED);
private final AtomicReference<Exception> error = new AtomicReference<>();
private final AtomicInteger appendSuccessCount = new AtomicInteger(0);
private final Phaser inflightRequestCount = new Phaser(1);
private TableName tableName = null;
private BigQueryWriteClient writeClient = null;
private StreamWriter streamWriter = null;
private String transferType;
private int maxRetryCount;
private int recordBatchCount;
private boolean skipInvalidRows;
static final PropertyDescriptor TRANSFER_TYPE = new PropertyDescriptor.Builder()
.name(TRANSFER_TYPE_NAME)
.displayName("Transfer Type")
.description(TRANSFER_TYPE_DESC)
.required(true)
.defaultValue(STREAM_TYPE.getValue())
.allowableValues(STREAM_TYPE, BATCH_TYPE)
.build();
static final PropertyDescriptor APPEND_RECORD_COUNT = new PropertyDescriptor.Builder()
.name(APPEND_RECORD_COUNT_NAME)
.displayName("Append Record Count")
.description(APPEND_RECORD_COUNT_DESC)
.required(true)
.defaultValue("20")
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.RECORD_READER_ATTR)
.displayName("Record Reader")
.description(BigQueryAttributes.RECORD_READER_DESC)
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
public static final PropertyDescriptor SKIP_INVALID_ROWS = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR)
.displayName("Skip Invalid Rows")
.description(BigQueryAttributes.SKIP_INVALID_ROWS_DESC)
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("false")
.build();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
descriptors.add(TRANSFER_TYPE);
descriptors.add(RECORD_READER);
descriptors.add(APPEND_RECORD_COUNT);
descriptors.add(SKIP_INVALID_ROWS);
descriptors.remove(IGNORE_UNKNOWN);
return Collections.unmodifiableList(descriptors);
}
@Override
@OnScheduled
public void onScheduled(ProcessContext context) {
super.onScheduled(context);
transferType = context.getProperty(TRANSFER_TYPE).getValue();
maxRetryCount = context.getProperty(RETRY_COUNT).asInteger();
skipInvalidRows = context.getProperty(SKIP_INVALID_ROWS).asBoolean();
recordBatchCount = context.getProperty(APPEND_RECORD_COUNT).asInteger();
tableName = TableName.of(context.getProperty(PROJECT_ID).getValue(), context.getProperty(DATASET).getValue(), context.getProperty(TABLE_NAME).getValue());
writeClient = createWriteClient(getGoogleCredentials(context));
}
@OnUnscheduled
public void onUnscheduled() {
writeClient.shutdown();
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) {
WriteStream writeStream;
Descriptors.Descriptor protoDescriptor;
try {
writeStream = createWriteStream();
protoDescriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(writeStream.getTableSchema());
streamWriter = createStreamWriter(writeStream.getName(), protoDescriptor, getGoogleCredentials(context));
} catch (Descriptors.DescriptorValidationException | IOException e) {
getLogger().error("Failed to create Big Query Stream Writer for writing", e);
context.yield();
return;
}
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
int recordNumWritten;
try {
try (InputStream in = session.read(flowFile)) {
RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
try (RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
recordNumWritten = writeRecordsToStream(reader, protoDescriptor);
}
}
flowFile = session.putAttribute(flowFile, BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(recordNumWritten));
} catch (Exception e) {
getLogger().error("Writing Records failed", e);
error.set(e);
} finally {
finishProcessing(session, flowFile, streamWriter, writeStream.getName(), tableName.toString());
}
}
private int writeRecordsToStream(RecordReader reader, Descriptors.Descriptor descriptor) throws Exception {
Record currentRecord;
int offset = 0;
int recordNum = 0;
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
while ((currentRecord = reader.nextRecord()) != null) {
DynamicMessage message = recordToProtoMessage(currentRecord, descriptor);
if (message == null) {
continue;
}
rowsBuilder.addSerializedRows(message.toByteString());
if (++recordNum % recordBatchCount == 0) {
append(new AppendContext(rowsBuilder.build(), offset));
rowsBuilder = ProtoRows.newBuilder();
offset = recordNum;
}
}
if (recordNum > offset) {
append(new AppendContext(rowsBuilder.build(), offset));
}
return recordNum;
}
private DynamicMessage recordToProtoMessage(Record record, Descriptors.Descriptor descriptor) {
Map<String, Object> valueMap = convertMapRecord(record.toMap());
DynamicMessage message = null;
try {
message = ProtoUtils.createMessage(descriptor, valueMap);
} catch (RuntimeException e) {
getLogger().error("Cannot convert record to message", e);
if (!skipInvalidRows) {
throw e;
}
}
return message;
}
private void append(AppendContext appendContext) throws Exception {
if (error.get() != null) {
throw error.get();
}
ApiFuture<AppendRowsResponse> future = streamWriter.append(appendContext.getData(), appendContext.getOffset());
ApiFutures.addCallback(future, new AppendCompleteCallback(appendContext), Runnable::run);
inflightRequestCount.register();
}
private void finishProcessing(ProcessSession session, FlowFile flowFile, StreamWriter streamWriter, String streamName, String parentTable) {
// Wait for all in-flight requests to complete.
inflightRequestCount.arriveAndAwaitAdvance();
// Close the connection to the server.
streamWriter.close();
// Verify that no error occurred in the stream.
if (error.get() != null) {
getLogger().error("Stream processing failed", error.get());
flowFile = session.putAttribute(flowFile, BigQueryAttributes.JOB_NB_RECORDS_ATTR, isBatch() ? "0" : String.valueOf(appendSuccessCount.get() * recordBatchCount));
session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
} else {
if (isBatch()) {
writeClient.finalizeWriteStream(streamName);
BatchCommitWriteStreamsRequest commitRequest =
BatchCommitWriteStreamsRequest.newBuilder()
.setParent(parentTable)
.addWriteStreams(streamName)
.build();
BatchCommitWriteStreamsResponse commitResponse = writeClient.batchCommitWriteStreams(commitRequest);
// If the response does not have a commit time, it means the commit operation failed.
if (!commitResponse.hasCommitTime()) {
for (StorageError err : commitResponse.getStreamErrorsList()) {
getLogger().error("Commit Storage Error Code: {} with message {}", err.getCode().name(), err.getErrorMessage());
}
session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
getLogger().info("Appended and committed all records successfully.");
}
session.transfer(flowFile, REL_SUCCESS);
}
}
class AppendCompleteCallback implements ApiFutureCallback<AppendRowsResponse> {
private final AppendContext appendContext;
public AppendCompleteCallback(AppendContext appendContext) {
this.appendContext = appendContext;
}
public void onSuccess(AppendRowsResponse response) {
getLogger().info("Append success with offset: {}", appendContext.getOffset());
appendSuccessCount.incrementAndGet();
inflightRequestCount.arriveAndDeregister();
}
public void onFailure(Throwable throwable) {
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information,
// see: https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
Status status = Status.fromThrowable(throwable);
if (appendContext.getRetryCount() < maxRetryCount && RETRYABLE_ERROR_CODES.contains(status.getCode())) {
appendContext.incrementRetryCount();
try {
append(appendContext);
inflightRequestCount.arriveAndDeregister();
return;
} catch (Exception e) {
getLogger().error("Failed to retry append", e);
}
}
error.compareAndSet(null, Optional.ofNullable(Exceptions.toStorageException(throwable))
.map(RuntimeException.class::cast)
.orElse(new RuntimeException(throwable)));
getLogger().error("Failure during appending data", throwable);
inflightRequestCount.arriveAndDeregister();
}
}
private WriteStream createWriteStream() {
WriteStream.Type type = isBatch() ? WriteStream.Type.PENDING : WriteStream.Type.COMMITTED;
CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder()
.setParent(tableName.toString())
.setWriteStream(WriteStream.newBuilder().setType(type).build())
.build();
return writeClient.createWriteStream(createWriteStreamRequest);
}
protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials) {
BigQueryWriteClient client;
try {
client = BigQueryWriteClient.create(BigQueryWriteSettings.newBuilder().setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build());
} catch (Exception e) {
throw new ProcessException("Failed to create Big Query Write Client for writing", e);
}
return client;
}
protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials) throws IOException {
ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor);
return StreamWriter.newBuilder(streamName)
.setWriterSchema(protoSchema)
.setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();
}
private boolean isBatch() {
return BATCH_TYPE.getValue().equals(transferType);
}
private static class AppendContext {
private final ProtoRows data;
private final long offset;
private int retryCount;
AppendContext(ProtoRows data, long offset) {
this.data = data;
this.offset = offset;
this.retryCount = 0;
}
public ProtoRows getData() {
return data;
}
public int getRetryCount() {
return retryCount;
}
public void incrementRetryCount() {
retryCount++;
}
public long getOffset() {
return offset;
}
}
private static Map<String, Object> convertMapRecord(Map<String, Object> map) {
Map<String, Object> result = new HashMap<>();
for (String key : map.keySet()) {
Object obj = map.get(key);
if (obj instanceof MapRecord) {
result.put(key, convertMapRecord(((MapRecord) obj).toMap()));
} else if (obj instanceof Object[]
&& ((Object[]) obj).length > 0
&& ((Object[]) obj)[0] instanceof MapRecord) {
List<Map<String, Object>> lmapr = new ArrayList<>();
for (Object mapr : ((Object[]) obj)) {
lmapr.add(convertMapRecord(((MapRecord) mapr).toMap()));
}
result.put(key, lmapr);
} else if (obj instanceof Timestamp) {
result.put(key, ((Timestamp) obj).getTime() * 1000);
} else if (obj instanceof Time) {
LocalTime time = ((Time) obj).toLocalTime();
org.threeten.bp.LocalTime localTime = org.threeten.bp.LocalTime.of(
time.getHour(),
time.getMinute(),
time.getSecond());
result.put(key, CivilTimeEncoder.encodePacked64TimeMicros(localTime));
} else if (obj instanceof Date) {
result.put(key, (int) ((Date) obj).toLocalDate().toEpochDay());
} else {
result.put(key, obj);
}
}
return result;
}
}

View File

@ -30,6 +30,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -63,10 +64,12 @@ import java.util.concurrent.TimeUnit;
/**
* A processor for batch loading data into a Google BigQuery table
* @deprecated use {@link PutBigQuery} instead which uses the Write API
*/
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DeprecationNotice(alternatives = {PutBigQuery.class}, reason = "This processor is deprecated and may be removed in future releases.")
@Tags({ "google", "google cloud", "bq", "bigquery" })
@CapabilityDescription("Batch loads flow files content to a Google BigQuery table.")
@CapabilityDescription("Please be aware this processor is deprecated and may be removed in the near future. Use PutBigQuery instead. Batch loads flow files content to a Google BigQuery table.")
@SeeAlso({ PutGCSObject.class, DeleteGCSObject.class })
@WritesAttributes({
@WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
@ -79,6 +82,7 @@ import java.util.concurrent.TimeUnit;
@WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
})
@Deprecated
public class PutBigQueryBatch extends AbstractBigQueryProcessor {
private static final List<String> TYPES = Arrays.asList(FormatOptions.json().getType(), FormatOptions.csv().getType(), FormatOptions.avro().getType());

View File

@ -27,6 +27,7 @@ import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -67,10 +68,14 @@ import java.util.Map;
* output table to periodically clean these rare duplicates. Alternatively, using the Batch insert
* method does guarantee no duplicates, though the latency for the insert into BigQuery will be much
* higher.
*
* @deprecated use {@link PutBigQuery} instead which uses the Write API
*/
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DeprecationNotice(alternatives = {PutBigQuery.class}, reason = "This processor is deprecated and may be removed in future releases.")
@Tags({ "google", "google cloud", "bq", "gcp", "bigquery", "record" })
@CapabilityDescription("Load data into Google BigQuery table using the streaming API. This processor "
@CapabilityDescription("Please be aware this processor is deprecated and may be removed in the near future. Use PutBigQuery instead. "
+ "Load data into Google BigQuery table using the streaming API. This processor "
+ "is not intended to load large flow files as it will load the full content into memory. If "
+ "you need to insert large flow files, consider using PutBigQueryBatch instead.")
@SeeAlso({ PutBigQueryBatch.class })
@ -78,8 +83,12 @@ import java.util.Map;
@WritesAttributes({
@WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
})
@Deprecated
public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS");
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name(BigQueryAttributes.RECORD_READER_ATTR)
.displayName("Record Reader")
@ -98,9 +107,6 @@ public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
.defaultValue("false")
.build();
private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS");
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
@ -182,8 +188,8 @@ public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
if (obj instanceof MapRecord) {
result.put(key, convertMapRecord(((MapRecord) obj).toMap()));
} else if (obj instanceof Object[]
&& ((Object[]) obj).length > 0
&& ((Object[]) obj)[0] instanceof MapRecord) {
&& ((Object[]) obj).length > 0
&& ((Object[]) obj)[0] instanceof MapRecord) {
List<Map<String, Object>> lmapr = new ArrayList<Map<String, Object>>();
for (Object mapr : ((Object[]) obj)) {
lmapr.add(convertMapRecord(((MapRecord) mapr).toMap()));
@ -198,7 +204,7 @@ public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
// ZoneOffset.UTC time zone is necessary due to implicit time zone conversion in Record Readers from
// the local system time zone to the GMT time zone
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Time) obj).getTime()), ZoneOffset.UTC);
result.put(key, dateTime.format(timeFormatter) );
result.put(key, dateTime.format(timeFormatter));
} else if (obj instanceof Date) {
result.put(key, obj.toString());
} else {

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery.proto;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
/**
* Util class for protocol buffer messaging
*/
public class ProtoUtils {
public static DynamicMessage createMessage(Descriptors.Descriptor descriptor, Map<String, Object> valueMap) {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
for (Descriptors.FieldDescriptor field : descriptor.getFields()) {
String name = field.getName();
Object value = valueMap.get(name);
if (value == null) {
continue;
}
if (Descriptors.FieldDescriptor.Type.MESSAGE.equals(field.getType())) {
if (field.isRepeated()) {
Collection collection = value.getClass().isArray() ? Arrays.asList((Object[]) value) : (Collection) value;
collection.forEach(act -> builder.addRepeatedField(field, createMessage(field.getMessageType(), (Map<String, Object>) act)));
} else {
builder.setField(field, createMessage(field.getMessageType(), (Map<String, Object>) value));
}
} else {
// Integer in the bigquery table schema maps back to INT64 which is considered to be Long on Java side:
// https://developers.google.com/protocol-buffers/docs/proto3
if (value instanceof Integer && (field.getType() == Descriptors.FieldDescriptor.Type.INT64)) {
value = Long.valueOf((Integer) value);
}
if (field.isRepeated()) {
Collection collection = value.getClass().isArray() ? Arrays.asList((Object[]) value) : (Collection) value;
collection.forEach(act -> builder.addRepeatedField(field, act));
} else {
builder.setField(field, value);
}
}
}
return builder.build();
}
}

View File

@ -20,6 +20,7 @@ org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite
org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite
org.apache.nifi.processors.gcp.bigquery.PutBigQuery
org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
org.apache.nifi.processors.gcp.drive.ListGoogleDrive

View File

@ -0,0 +1,58 @@
<!DOCTYPE html>
<html lang="en" xmlns="http://www.w3.org/1999/html">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>PutBigQuery</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<h1>Streaming Versus Batching Data</h1>
<p>
PutBigQuery is record based and is relying on the gRPC based Write API using protocol buffers. The underlying stream supports both streaming and batching approaches.
</p>
<h3>Streaming</h3>
<p>
With streaming the appended data to the stream is instantly available in BigQuery for reading. It is configurable how many records (rows) should be appended at once.
Only one stream is established per flow file so at the conclusion of the FlowFile processing the used stream is closed and a new one is opened for the next FlowFile.
Supports exactly once delivery semantics via stream offsets.
</p>
<h3>Batching</h3>
<p>
Similarly to the streaming approach one stream is opened for each FlowFile and records are appended to the stream. However data is not available in BigQuery until it is
committed by the processor at the end of the FlowFile processing.
</p>
<h1>Improvement opportunities</h1>
<p>
<ul>
<li>The table has to exist on BigQuery side it is not created automatically</li>
<li>The Write API supports multiple streams for parallel execution and transactionality across streams. This is not utilized at the moment as this would be covered on NiFI framework level.</li>
</ul>
</p>
<p>
The official <a href="https://cloud.google.com/bigquery/docs/write-api">Google Write API documentation</a> provides additional details.
</p>
</body>
</html>

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
@ -26,30 +25,26 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
/**
* Base class for BigQuery Unit Tests. Provides a framework for creating a TestRunner instance with always-required credentials.
*/
@ExtendWith(MockitoExtension.class)
public abstract class AbstractBQTest {
private static final String PROJECT_ID = System.getProperty("test.gcp.project.id", "nifi-test-gcp-project");
private static final Integer RETRIES = 9;
static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
@BeforeEach
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
}
public static TestRunner buildNewRunner(Processor processor) throws Exception {
final GCPCredentialsService credentialsService = new GCPCredentialsControllerService();
@ -84,13 +79,8 @@ public abstract class AbstractBQTest {
final BigQueryOptions options = processor.getServiceOptions(runner.getProcessContext(),
mockCredentials);
assertEquals("Project IDs should match",
PROJECT_ID, options.getProjectId());
assertEquals("Retry counts should match",
RETRIES.intValue(), options.getRetrySettings().getMaxAttempts());
assertSame("Credentials should be configured correctly",
mockCredentials, options.getCredentials());
assertEquals(PROJECT_ID, options.getProjectId(), "Project IDs should match");
assertEquals(RETRIES.intValue(), options.getRetrySettings().getMaxAttempts(), "Retry counts should match");
assertSame(mockCredentials, options.getCredentials(), "Credentials should be configured correctly");
}
}

View File

@ -1,139 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.cloud.bigquery.FormatOptions;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
public class PutBigQueryBatchIT extends AbstractBigQueryIT {
private static final String TABLE_SCHEMA_STRING = "[\n" +
" {\n" +
" \"description\": \"field 1\",\n" +
" \"mode\": \"REQUIRED\",\n" +
" \"name\": \"field_1\",\n" +
" \"type\": \"STRING\"\n" +
" },\n" +
" {\n" +
" \"description\": \"field 2\",\n" +
" \"mode\": \"REQUIRED\",\n" +
" \"name\": \"field_2\",\n" +
" \"type\": \"STRING\"\n" +
" },\n" +
" {\n" +
" \"description\": \"field 3\",\n" +
" \"mode\": \"NULLABLE\",\n" +
" \"name\": \"field_3\",\n" +
" \"type\": \"STRING\"\n" +
" }\n" +
"]";
@BeforeEach
public void setup() {
runner = TestRunners.newTestRunner(PutBigQueryBatch.class);
runner.setProperty(AbstractBigQueryProcessor.PROJECT_ID, PROJECT_ID);
}
@Test
public void PutBigQueryBatchSmallPayloadTest() throws Exception {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
runner = setCredentialsControllerService(runner);
runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName);
runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, FormatOptions.json().getType());
runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, TABLE_SCHEMA_STRING);
String str = "{\"field_1\":\"Daniel is great\",\"field_2\":\"Daniel is great\"}\r\n";
runner.enqueue(new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)));
runner.run(1);
for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(AbstractBigQueryProcessor.REL_SUCCESS)) {
validateNoServiceExceptionAttribute(flowFile);
}
runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_SUCCESS, 1);
runner.assertAllFlowFilesContainAttribute(BigQueryAttributes.JOB_ID_ATTR);
}
@Test
public void PutBigQueryBatchBadRecordTest() throws Exception {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
runner = setCredentialsControllerService(runner);
runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName);
runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, FormatOptions.json().getType());
runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, TABLE_SCHEMA_STRING);
String str = "{\"field_1\":\"Daniel is great\"}\r\n";
runner.enqueue(new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)));
runner.run(1);
runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_FAILURE, 1);
}
@Test
public void PutBigQueryBatchLargePayloadTest() throws InitializationException, IOException {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
runner = setCredentialsControllerService(runner);
runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName);
runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, FormatOptions.json().getType());
runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, TABLE_SCHEMA_STRING);
// Allow one bad record to deal with the extra line break.
runner.setProperty(BigQueryAttributes.MAX_BADRECORDS_ATTR, String.valueOf(1));
String str = "{\"field_1\":\"Daniel is great\",\"field_2\":\"Here's to the crazy ones. The misfits. The rebels. The troublemakers." +
" The round pegs in the square holes. The ones who see things differently. They're not fond of rules. And they have no respect" +
" for the status quo. You can quote them, disagree with them, glorify or vilify them. About the only thing you can't do is ignore" +
" them. Because they change things. They push the human race forward. And while some may see them as the crazy ones, we see genius." +
" Because the people who are crazy enough to think they can change the world, are the ones who do.\"}\n";
Path tempFile = Files.createTempFile(methodName, "");
try (BufferedWriter writer = Files.newBufferedWriter(tempFile)) {
for (int i = 0; i < 2; i++) {
for (int ii = 0; ii < 1_000_000; ii++) {
writer.write(str);
}
writer.flush();
}
writer.flush();
}
runner.enqueue(tempFile);
runner.run(1);
for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(AbstractBigQueryProcessor.REL_SUCCESS)) {
validateNoServiceExceptionAttribute(flowFile);
}
runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_SUCCESS, 1);
}
}

View File

@ -25,14 +25,12 @@ import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
@ -40,30 +38,23 @@ import org.mockito.Mock;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.when;
/**
* Unit tests for {@link PutBigQueryBatch}.
*/
public class PutBigQueryBatchTest extends AbstractBQTest {
private static final String TABLENAME = "test_table";
private static final String TABLE_NAME = "test_table";
private static final String TABLE_SCHEMA = "[{ \"mode\": \"NULLABLE\", \"name\": \"data\", \"type\": \"STRING\" }]";
private static final String SOURCE_TYPE = FormatOptions.json().getType();
private static final String CREATE_DISPOSITION = JobInfo.CreateDisposition.CREATE_IF_NEEDED.name();
private static final String WRITE_DISPOSITION = JobInfo.WriteDisposition.WRITE_EMPTY.name();
private static final String MAXBAD_RECORDS = "0";
private static final String MAX_BAD_RECORDS = "0";
private static final String IGNORE_UNKNOWN = "true";
private static final String READ_TIMEOUT = "5 minutes";
@Mock
BigQuery bq;
@Mock
Table table;
@Mock
Job job;
@ -79,16 +70,6 @@ public class PutBigQueryBatchTest extends AbstractBQTest {
@Mock
TableDataWriteChannel tableDataWriteChannel;
@BeforeEach
public void setup() throws Exception {
super.setup();
reset(bq);
reset(table);
reset(job);
reset(jobStatus);
reset(stats);
}
@Override
public AbstractBigQueryProcessor getProcessor() {
return new PutBigQueryBatch() {
@ -107,20 +88,18 @@ public class PutBigQueryBatchTest extends AbstractBQTest {
@Override
protected void addRequiredPropertiesToRunner(TestRunner runner) {
runner.setProperty(PutBigQueryBatch.DATASET, DATASET);
runner.setProperty(PutBigQueryBatch.TABLE_NAME, TABLENAME);
runner.setProperty(PutBigQueryBatch.TABLE_NAME, TABLE_NAME);
runner.setProperty(PutBigQueryBatch.TABLE_SCHEMA, TABLE_SCHEMA);
runner.setProperty(PutBigQueryBatch.SOURCE_TYPE, SOURCE_TYPE);
runner.setProperty(PutBigQueryBatch.CREATE_DISPOSITION, CREATE_DISPOSITION);
runner.setProperty(PutBigQueryBatch.WRITE_DISPOSITION, WRITE_DISPOSITION);
runner.setProperty(PutBigQueryBatch.MAXBAD_RECORDS, MAXBAD_RECORDS);
runner.setProperty(PutBigQueryBatch.MAXBAD_RECORDS, MAX_BAD_RECORDS);
runner.setProperty(PutBigQueryBatch.IGNORE_UNKNOWN, IGNORE_UNKNOWN);
runner.setProperty(PutBigQueryBatch.READ_TIMEOUT, READ_TIMEOUT);
}
@Test
public void testSuccessfulLoad() throws Exception {
when(table.exists()).thenReturn(Boolean.TRUE);
when(bq.create(ArgumentMatchers.isA(JobInfo.class))).thenReturn(job);
when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
when(tableDataWriteChannel.getJob()).thenReturn(job);
when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenReturn(job);
@ -152,17 +131,9 @@ public class PutBigQueryBatchTest extends AbstractBQTest {
@Test
public void testFailedLoad() throws Exception {
when(table.exists()).thenReturn(Boolean.TRUE);
when(bq.create(ArgumentMatchers.isA(JobInfo.class))).thenReturn(job);
when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
when(tableDataWriteChannel.getJob()).thenReturn(job);
when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenThrow(BigQueryException.class);
when(job.getStatus()).thenReturn(jobStatus);
when(job.getStatistics()).thenReturn(stats);
when(stats.getCreationTime()).thenReturn(0L);
when(stats.getStartTime()).thenReturn(1L);
when(stats.getEndTime()).thenReturn(2L);
final TestRunner runner = buildNewRunner(getProcessor());
addRequiredPropertiesToRunner(runner);

View File

@ -0,0 +1,368 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.DateTimeUtils;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.format.DateTimeFormatter;
import java.util.Iterator;
import java.util.UUID;
import static org.apache.nifi.processors.gcp.bigquery.PutBigQuery.BATCH_TYPE;
import static org.apache.nifi.processors.gcp.bigquery.PutBigQuery.STREAM_TYPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Restructured and enhanced version of the existing PutBigQuery Integration Tests. The underlying classes are deprecated so that tests will be removed as well in the future
*
*/
public class PutBigQueryIT extends AbstractBigQueryIT {
private Schema schema;
@BeforeEach
public void setup() throws InitializationException {
runner = TestRunners.newTestRunner(PutBigQuery.class);
runner = setCredentialsControllerService(runner);
runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(AbstractGCPProcessor.PROJECT_ID, PROJECT_ID);
}
@Test
public void testStreamingNoError() throws Exception {
String tableName = prepareTable(STREAM_TYPE);
addRecordReader();
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(PutBigQuery.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
assertStreamingData(tableName);
deleteTable(tableName);
}
@Test
public void testStreamingFullError() throws Exception {
String tableName = prepareTable(STREAM_TYPE);
addRecordReader();
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-bad-data.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(PutBigQuery.REL_FAILURE).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "0");
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
assertFalse(result.getValues().iterator().hasNext());
deleteTable(tableName);
}
@Test
public void testStreamingPartialError() throws Exception {
String tableName = prepareTable(STREAM_TYPE);
addRecordReader();
runner.setProperty(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR, "true");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-bad-data.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(PutBigQuery.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "1");
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
Iterator<FieldValueList> iterator = result.getValues().iterator();
FieldValueList firstElt = iterator.next();
assertFalse(iterator.hasNext());
assertEquals(firstElt.get("name").getStringValue(), "Jane Doe");
deleteTable(tableName);
}
@Test
public void testStreamingNoErrorWithDate() throws Exception {
String tableName = prepareTable(STREAM_TYPE);
addRecordReaderWithSchema("src/test/resources/bigquery/schema-correct-data-with-date.avsc");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data-with-date.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(PutBigQuery.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
assertStreamingData(tableName, true, false);
deleteTable(tableName);
}
@Test
public void testStreamingNoErrorWithDateFormat() throws Exception {
String tableName = prepareTable(STREAM_TYPE);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String recordSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/schema-correct-data-with-date.avsc")));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema);
runner.setProperty(jsonReader, DateTimeUtils.DATE_FORMAT, "MM/dd/yyyy");
runner.setProperty(jsonReader, DateTimeUtils.TIME_FORMAT, "HH:mm:ss");
runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, "MM-dd-yyyy HH:mm:ss Z");
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(PutBigQuery.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
assertStreamingData(tableName, false, true);
deleteTable(tableName);
}
@Test
public void testBatchSmallPayload() throws Exception {
String tableName = prepareTable(BATCH_TYPE);
addRecordReader();
String str = "{\"field_1\":\"Daniel is great\",\"field_2\":\"Daniel is great\"}\r\n";
runner.enqueue(new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(PutBigQuery.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "1");
deleteTable(tableName);
}
@Test
public void testQueryBatchBadRecord() throws Exception {
String tableName = prepareTable(BATCH_TYPE);
addRecordReader();
String str = "{\"field_1\":\"Daniel is great\"}\r\n";
runner.enqueue(new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)));
runner.run();
runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_FAILURE, 1);
deleteTable(tableName);
}
@Test
public void testBatchLargePayload() throws InitializationException, IOException {
String tableName = prepareTable(BATCH_TYPE);
addRecordReader();
runner.setProperty(PutBigQuery.APPEND_RECORD_COUNT, "5000");
String str = "{\"field_1\":\"Daniel is great\",\"field_2\":\"Here's to the crazy ones. The misfits. The rebels. The troublemakers." +
" The round pegs in the square holes. The ones who see things differently. They're not fond of rules. And they have no respect" +
" for the status quo. You can quote them, disagree with them, glorify or vilify them. About the only thing you can't do is ignore" +
" them. Because they change things. They push the human race forward. And while some may see them as the crazy ones, we see genius." +
" Because the people who are crazy enough to think they can change the world, are the ones who do.\"}\n";
Path tempFile = Files.createTempFile(tableName, "");
int recordCount = 100_000;
try (BufferedWriter writer = Files.newBufferedWriter(tempFile)) {
for (int i = 0; i < recordCount; i++) {
writer.write(str);
}
writer.flush();
}
runner.enqueue(tempFile);
runner.run();
runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(PutBigQuery.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(recordCount));
}
private String prepareTable(AllowableValue transferType) {
String tableName = UUID.randomUUID().toString();
if (STREAM_TYPE.equals(transferType)) {
createTableForStream(tableName);
} else {
createTableForBatch(tableName);
}
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
runner.setProperty(PutBigQuery.TRANSFER_TYPE, transferType);
return tableName;
}
private void addRecordReader() throws InitializationException {
JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
}
private void addRecordReaderWithSchema(String schema) throws InitializationException, IOException {
JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
String recordSchema = new String(Files.readAllBytes(Paths.get(schema)));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema);
runner.enableControllerService(jsonReader);
}
private void createTableForStream(String tableName) {
TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
// Table field definition
Field id = Field.newBuilder("id", LegacySQLTypeName.INTEGER).setMode(Field.Mode.REQUIRED).build();
Field name = Field.newBuilder("name", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build();
Field alias = Field.newBuilder("alias", LegacySQLTypeName.STRING).setMode(Field.Mode.REPEATED).build();
Field zip = Field.newBuilder("zip", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build();
Field city = Field.newBuilder("city", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build();
Field addresses = Field.newBuilder("addresses", LegacySQLTypeName.RECORD, zip, city).setMode(Field.Mode.REPEATED).build();
Field position = Field.newBuilder("position", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build();
Field company = Field.newBuilder("company", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build();
Field job = Field.newBuilder("job", LegacySQLTypeName.RECORD, position, company).setMode(Field.Mode.NULLABLE).build();
Field date = Field.newBuilder("date", LegacySQLTypeName.DATE).setMode(Field.Mode.NULLABLE).build();
Field time = Field.newBuilder("time", LegacySQLTypeName.TIME).setMode(Field.Mode.NULLABLE).build();
Field full = Field.newBuilder("full", LegacySQLTypeName.TIMESTAMP).setMode(Field.Mode.NULLABLE).build();
Field birth = Field.newBuilder("birth", LegacySQLTypeName.RECORD, date, time, full).setMode(Field.Mode.NULLABLE).build();
// Table schema definition
schema = Schema.of(id, name, alias, addresses, job, birth);
TableDefinition tableDefinition = StandardTableDefinition.of(schema);
TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
// create table
bigquery.create(tableInfo);
}
private void createTableForBatch(String tableName) {
TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
// Table field definition
Field field1 = Field.newBuilder("field_1", LegacySQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build();
Field field2 = Field.newBuilder("field_2", LegacySQLTypeName.STRING).setMode(Field.Mode.REQUIRED).build();
Field field3 = Field.newBuilder("field_3", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build();
// Table schema definition
schema = Schema.of(field1, field2, field3);
TableDefinition tableDefinition = StandardTableDefinition.of(schema);
TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
// create table
bigquery.create(tableInfo);
}
private void deleteTable(String tableName) {
TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
bigquery.delete(tableId);
}
private void assertStreamingData(String tableName) {
assertStreamingData(tableName, false, false);
}
private void assertStreamingData(String tableName, boolean assertDate, boolean assertDateFormatted) {
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
Iterator<FieldValueList> iterator = result.getValues().iterator();
FieldValueList firstElt = iterator.next();
FieldValueList sndElt = iterator.next();
assertTrue(firstElt.get("name").getStringValue().endsWith("Doe"));
assertTrue(sndElt.get("name").getStringValue().endsWith("Doe"));
FieldValueList john;
FieldValueList jane;
john = firstElt.get("name").getStringValue().equals("John Doe") ? firstElt : sndElt;
jane = firstElt.get("name").getStringValue().equals("Jane Doe") ? firstElt : sndElt;
assertEquals(jane.get("job").getRecordValue().get(0).getStringValue(), "Director");
assertEquals(2, john.get("alias").getRepeatedValue().size());
assertTrue(john.get("addresses").getRepeatedValue().get(0).getRecordValue().get(0).getStringValue().endsWith("000"));
if (assertDate) {
FieldValueList johnFields = john.get("birth").getRecordValue();
FieldValueList janeFields = jane.get("birth").getRecordValue();
ZonedDateTime johnDateTime = Instant.parse("2021-07-18T12:35:24Z").atZone(ZoneId.systemDefault());
assertEquals(johnDateTime.toLocalDate().format(DateTimeFormatter.ISO_DATE), johnFields.get(0).getStringValue());
assertEquals(johnDateTime.toLocalTime().format(DateTimeFormatter.ISO_TIME), johnFields.get(1).getStringValue());
assertEquals(johnDateTime.toInstant().toEpochMilli(), (johnFields.get(2).getTimestampValue() / 1000));
ZonedDateTime janeDateTime = Instant.parse("1992-01-01T00:00:00Z").atZone(ZoneId.systemDefault());
assertEquals(janeDateTime.toLocalDate().format(DateTimeFormatter.ISO_DATE), janeFields.get(0).getStringValue());
assertEquals(janeDateTime.toLocalTime().format(DateTimeFormatter.ISO_TIME), janeFields.get(1).getStringValue());
assertEquals(janeDateTime.toInstant().toEpochMilli(), (janeFields.get(2).getTimestampValue() / 1000));
}
if (assertDateFormatted) {
FieldValueList johnFields = john.get("birth").getRecordValue();
FieldValueList janeFields = jane.get("birth").getRecordValue();
assertEquals("2021-07-18", johnFields.get(0).getStringValue());
assertEquals("12:35:24", johnFields.get(1).getStringValue());
assertEquals(Instant.parse("2021-07-18T12:35:24Z").toEpochMilli(), (johnFields.get(2).getTimestampValue() / 1000));
assertEquals("1992-01-01", janeFields.get(0).getStringValue());
assertEquals("00:00:00", janeFields.get(1).getStringValue());
assertEquals( Instant.parse("1992-01-01T00:00:00Z").toEpochMilli(), (janeFields.get(2).getTimestampValue() / 1000));
}
}
}

View File

@ -1,306 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Field.Mode;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.DateTimeUtils;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Iterator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class PutBigQueryStreamingIT extends AbstractBigQueryIT {
private Schema schema;
@BeforeEach
public void setup() throws InitializationException {
runner = TestRunners.newTestRunner(PutBigQueryStreaming.class);
runner = setCredentialsControllerService(runner);
runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, CONTROLLER_SERVICE);
runner.setProperty(AbstractGCPProcessor.PROJECT_ID, PROJECT_ID);
}
private void createTable(String tableName) {
TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
// Table field definition
Field id = Field.newBuilder("id", LegacySQLTypeName.INTEGER).setMode(Mode.REQUIRED).build();
Field name = Field.newBuilder("name", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field alias = Field.newBuilder("alias", LegacySQLTypeName.STRING).setMode(Mode.REPEATED).build();
Field zip = Field.newBuilder("zip", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field city = Field.newBuilder("city", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field addresses = Field.newBuilder("addresses", LegacySQLTypeName.RECORD, zip, city).setMode(Mode.REPEATED).build();
Field position = Field.newBuilder("position", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field company = Field.newBuilder("company", LegacySQLTypeName.STRING).setMode(Mode.NULLABLE).build();
Field job = Field.newBuilder("job", LegacySQLTypeName.RECORD, position, company).setMode(Mode.NULLABLE).build();
Field date = Field.newBuilder("date", LegacySQLTypeName.DATE).setMode(Mode.NULLABLE).build();
Field time = Field.newBuilder("time", LegacySQLTypeName.TIME).setMode(Mode.NULLABLE).build();
Field full = Field.newBuilder("full", LegacySQLTypeName.TIMESTAMP).setMode(Mode.NULLABLE).build();
Field birth = Field.newBuilder("birth", LegacySQLTypeName.RECORD, date, time, full).setMode(Mode.NULLABLE).build();
// Table schema definition
schema = Schema.of(id, name, alias, addresses, job, birth);
TableDefinition tableDefinition = StandardTableDefinition.of(schema);
TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
// create table
bigquery.create(tableInfo);
}
private void deleteTable(String tableName) {
TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
bigquery.delete(tableId);
}
@Test
public void PutBigQueryStreamingNoError() throws Exception {
String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
createTable(tableName);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
Iterator<FieldValueList> iterator = result.getValues().iterator();
FieldValueList firstElt = iterator.next();
FieldValueList sndElt = iterator.next();
assertTrue(firstElt.get("name").getStringValue().endsWith("Doe"));
assertTrue(sndElt.get("name").getStringValue().endsWith("Doe"));
FieldValueList john;
FieldValueList jane;
john = firstElt.get("name").getStringValue().equals("John Doe") ? firstElt : sndElt;
jane = firstElt.get("name").getStringValue().equals("Jane Doe") ? firstElt : sndElt;
assertEquals(jane.get("job").getRecordValue().get(0).getStringValue(), "Director");
assertTrue(john.get("alias").getRepeatedValue().size() == 2);
assertTrue(john.get("addresses").getRepeatedValue().get(0).getRecordValue().get(0).getStringValue().endsWith("000"));
deleteTable(tableName);
}
@Test
public void PutBigQueryStreamingFullError() throws Exception {
String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
createTable(tableName);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-bad-data.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_FAILURE).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "0");
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
assertFalse(result.getValues().iterator().hasNext());
deleteTable(tableName);
}
@Test
public void PutBigQueryStreamingPartialError() throws Exception {
String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
createTable(tableName);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.setProperty(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR, "true");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-bad-data.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_FAILURE).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "1");
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
Iterator<FieldValueList> iterator = result.getValues().iterator();
FieldValueList firstElt = iterator.next();
assertFalse(iterator.hasNext());
assertEquals(firstElt.get("name").getStringValue(), "Jane Doe");
deleteTable(tableName);
}
@Test
public void PutBigQueryStreamingNoErrorWithDate() throws Exception {
String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
createTable(tableName);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String recordSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/schema-correct-data-with-date.avsc")));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema);
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data-with-date.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
Iterator<FieldValueList> iterator = result.getValues().iterator();
FieldValueList firstElt = iterator.next();
FieldValueList sndElt = iterator.next();
assertTrue(firstElt.get("name").getStringValue().endsWith("Doe"));
assertTrue(sndElt.get("name").getStringValue().endsWith("Doe"));
FieldValueList john;
FieldValueList jane;
john = firstElt.get("name").getStringValue().equals("John Doe") ? firstElt : sndElt;
jane = firstElt.get("name").getStringValue().equals("Jane Doe") ? firstElt : sndElt;
assertEquals(jane.get("job").getRecordValue().get(0).getStringValue(), "Director");
assertTrue(john.get("alias").getRepeatedValue().size() == 2);
assertTrue(john.get("addresses").getRepeatedValue().get(0).getRecordValue().get(0).getStringValue().endsWith("000"));
long timestampRecordJohn = LocalDateTime.parse("07-18-2021 12:35:24",
DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
assertEquals(john.get("birth").getRecordValue().get(0).getStringValue(), "2021-07-18");
assertEquals(john.get("birth").getRecordValue().get(1).getStringValue(), "12:35:24");
assertEquals((john.get("birth").getRecordValue().get(2).getTimestampValue() / 1000), timestampRecordJohn);
long timestampRecordJane = LocalDateTime.parse("01-01-1992 00:00:00",
DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
assertEquals(jane.get("birth").getRecordValue().get(0).getStringValue(), "1992-01-01");
assertEquals(jane.get("birth").getRecordValue().get(1).getStringValue(), "00:00:00");
assertEquals((jane.get("birth").getRecordValue().get(2).getTimestampValue() / 1000) , timestampRecordJane);
deleteTable(tableName);
}
@Test
public void PutBigQueryStreamingNoErrorWithDateFormat() throws Exception {
String tableName = Thread.currentThread().getStackTrace()[1].getMethodName();
createTable(tableName);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String recordSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/schema-correct-data-with-date.avsc")));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema);
runner.setProperty(jsonReader, DateTimeUtils.DATE_FORMAT, "MM/dd/yyyy");
runner.setProperty(jsonReader, DateTimeUtils.TIME_FORMAT, "HH:mm:ss");
runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, "MM-dd-yyyy HH:mm:ss");
runner.enableControllerService(jsonReader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.enqueue(Paths.get("src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQueryStreaming.REL_SUCCESS, 1);
runner.getFlowFilesForRelationship(PutBigQueryStreaming.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, "2");
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
Iterator<FieldValueList> iterator = result.getValues().iterator();
FieldValueList firstElt = iterator.next();
FieldValueList sndElt = iterator.next();
assertTrue(firstElt.get("name").getStringValue().endsWith("Doe"));
assertTrue(sndElt.get("name").getStringValue().endsWith("Doe"));
FieldValueList john;
FieldValueList jane;
john = firstElt.get("name").getStringValue().equals("John Doe") ? firstElt : sndElt;
jane = firstElt.get("name").getStringValue().equals("Jane Doe") ? firstElt : sndElt;
assertEquals(jane.get("job").getRecordValue().get(0).getStringValue(), "Director");
assertTrue(john.get("alias").getRepeatedValue().size() == 2);
assertTrue(john.get("addresses").getRepeatedValue().get(0).getRecordValue().get(0).getStringValue().endsWith("000"));
long timestampRecordJohn = LocalDateTime.parse("07-18-2021 12:35:24",
DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
assertEquals(john.get("birth").getRecordValue().get(0).getStringValue(), "2021-07-18");
assertEquals(john.get("birth").getRecordValue().get(1).getStringValue(), "12:35:24");
assertEquals(john.get("birth").getRecordValue().get(2).getTimestampValue() / 1000, timestampRecordJohn);
long timestampRecordJane = LocalDateTime.parse("01-01-1992 00:00:00",
DateTimeFormatter.ofPattern("MM-dd-yyyy HH:mm:ss")).atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
assertEquals(jane.get("birth").getRecordValue().get(0).getStringValue(), "1992-01-01");
assertEquals(jane.get("birth").getRecordValue().get(1).getStringValue(), "00:00:00");
assertEquals(jane.get("birth").getRecordValue().get(2).getTimestampValue() / 1000, timestampRecordJane);
deleteTable(tableName);
}
}

View File

@ -0,0 +1,447 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.gcp.bigquery;
import com.google.api.core.ApiFutures;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.Descriptors;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import org.apache.nifi.csv.CSVReader;
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.nifi.processors.gcp.bigquery.PutBigQuery.BATCH_TYPE;
import static org.apache.nifi.processors.gcp.bigquery.PutBigQuery.STREAM_TYPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Unit tests for {@link PutBigQuery}.
*/
public class PutBigQueryTest extends AbstractBQTest {
private static final String FIELD_1_NAME = "id";
private static final String FIELD_2_NAME = "value";
private static final String CSV_HEADER = FIELD_1_NAME + "," + FIELD_2_NAME;
private static final String VALUE_PREFIX = "mySpecialValue";
private TestRunner runner;
@Mock
private BigQueryWriteClient writeClient;
@Mock
private WriteStream writeStream;
@Mock
private StreamWriter streamWriter;
@Captor
private ArgumentCaptor<ProtoRows> protoRowsCaptor;
@Captor
private ArgumentCaptor<Long> offsetCaptor;
@Captor
private ArgumentCaptor<BatchCommitWriteStreamsRequest> batchCommitRequestCaptor;
@Override
public AbstractBigQueryProcessor getProcessor() {
return new PutBigQuery() {
@Override
protected BigQuery getCloudService() {
return bq;
}
@Override
protected BigQuery getCloudService(final ProcessContext context) {
return bq;
}
@Override
protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials) {
return streamWriter;
}
@Override
protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials) {
return writeClient;
}
};
}
@Override
protected void addRequiredPropertiesToRunner(TestRunner runner) {
runner.setProperty(PutBigQuery.DATASET, DATASET);
runner.setProperty(PutBigQuery.TABLE_NAME, "tableName");
runner.setProperty(PutBigQuery.APPEND_RECORD_COUNT, "999");
runner.setProperty(PutBigQuery.RETRY_COUNT, "999");
runner.setProperty(PutBigQuery.RECORD_READER, "csvReader");
runner.setProperty(PutBigQuery.TRANSFER_TYPE, STREAM_TYPE);
}
@BeforeEach
void setup() throws Exception {
AbstractBigQueryProcessor processor = getProcessor();
runner = buildNewRunner(processor);
decorateWithRecordReader(runner);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
}
@Test
void testMandatoryProjectId() {
runner.removeProperty(PutBigQuery.PROJECT_ID);
runner.assertNotValid();
}
@ParameterizedTest(name = "{index} => csvLineCount={0}, appendRecordCount={1}")
@MethodSource("generateRecordGroupingParameters")
void testRecordGrouping(Integer csvLineCount, Integer appendRecordCount) {
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
TableSchema myTableSchema = mockTableSchema(FIELD_1_NAME, TableFieldSchema.Type.STRING, FIELD_2_NAME, TableFieldSchema.Type.STRING);
when(writeStream.getTableSchema()).thenReturn(myTableSchema);
when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
.thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));
runner.setProperty(PutBigQuery.APPEND_RECORD_COUNT, appendRecordCount.toString());
runner.enqueue(csvContentWithLines(csvLineCount));
runner.run();
Integer expectedAppendCount = (int) Math.ceil( (double) csvLineCount / appendRecordCount);
verify(streamWriter, times(expectedAppendCount)).append(protoRowsCaptor.capture(), offsetCaptor.capture());
List<ProtoRows> allValues = protoRowsCaptor.getAllValues();
List<Long> offsets = offsetCaptor.getAllValues();
assertEquals(expectedAppendCount, allValues.size());
for (int i = 0; i < expectedAppendCount - 1; i++) {
assertEquals(appendRecordCount, allValues.get(i).getSerializedRowsCount());
assertEquals((long) i * appendRecordCount, offsets.get(i));
for (int j = 0; j < appendRecordCount; j++) {
assertTrue(allValues.get(i).getSerializedRowsList().get(j).toString().contains(VALUE_PREFIX + (i * appendRecordCount + j)));
}
}
int lastAppendSize = csvLineCount % appendRecordCount;
if (lastAppendSize != 0) {
assertEquals(lastAppendSize, allValues.get(allValues.size() - 1).getSerializedRowsCount());
for (int j = 0; j < lastAppendSize; j++) {
assertTrue(allValues.get(allValues.size() - 1).getSerializedRowsList().get(j).toString().contains(VALUE_PREFIX + ((expectedAppendCount - 1) * appendRecordCount + j)));
}
}
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS);
}
private static Stream<Arguments> generateRecordGroupingParameters() {
return Stream.of(
Arguments.of(2, 3),
Arguments.of(5, 3),
Arguments.of(5, 5),
Arguments.of(11, 5)
);
}
@Test
void testMultipleFlowFiles() {
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
TableSchema myTableSchema = mockTableSchema(FIELD_1_NAME, TableFieldSchema.Type.STRING, FIELD_2_NAME, TableFieldSchema.Type.STRING);
when(writeStream.getTableSchema()).thenReturn(myTableSchema);
when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
.thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));
int entityCountFirst = 3;
int entityCountSecond = 5;
runner.enqueue(csvContentWithLines(entityCountFirst));
runner.enqueue(csvContentWithLines(entityCountSecond));
int iteration = 2;
runner.run(iteration);
verify(streamWriter, times(iteration)).append(protoRowsCaptor.capture(), offsetCaptor.capture());
List<ProtoRows> allValues = protoRowsCaptor.getAllValues();
assertEquals(iteration, allValues.size());
assertEquals(entityCountFirst, allValues.get(0).getSerializedRowsCount());
assertEquals(entityCountSecond, allValues.get(1).getSerializedRowsCount());
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS, iteration);
runner.getFlowFilesForRelationship(PutBigQuery.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(entityCountFirst));
runner.getFlowFilesForRelationship(PutBigQuery.REL_SUCCESS).get(1).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(entityCountSecond));
}
@Test
void testRetryAndFailureAfterRetryCount() {
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
TableSchema myTableSchema = mockTableSchema(FIELD_1_NAME, TableFieldSchema.Type.STRING, FIELD_2_NAME, TableFieldSchema.Type.STRING);
when(writeStream.getTableSchema()).thenReturn(myTableSchema);
when(streamWriter.append(isA(ProtoRows.class), isA(Long.class))).thenReturn(ApiFutures.immediateFailedFuture(new StatusRuntimeException(Status.INTERNAL)));
int retryCount = 5;
runner.setProperty(PutBigQuery.RETRY_COUNT, Integer.toString(retryCount));
runner.enqueue(csvContentWithLines(1));
runner.run();
verify(streamWriter, times(retryCount + 1)).append(any(ProtoRows.class), anyLong());
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_FAILURE);
}
@Test
void testRetryAndSuccessBeforeRetryCount() {
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
TableSchema myTableSchema = mockTableSchema(FIELD_1_NAME, TableFieldSchema.Type.STRING, FIELD_2_NAME, TableFieldSchema.Type.STRING);
when(writeStream.getTableSchema()).thenReturn(myTableSchema);
when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
.thenReturn(ApiFutures.immediateFailedFuture(new StatusRuntimeException(Status.INTERNAL)))
.thenReturn(ApiFutures.immediateFailedFuture(new StatusRuntimeException(Status.INTERNAL)))
.thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));
runner.enqueue(csvContentWithLines(1));
runner.run();
verify(streamWriter, times(3)).append(any(ProtoRows.class), anyLong());
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS);
}
@Test
void testBatch() {
String streamName = "myStreamName";
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
FinalizeWriteStreamResponse finalizeWriteStreamResponse = mock(FinalizeWriteStreamResponse.class);
when(writeClient.finalizeWriteStream(streamName)).thenReturn(finalizeWriteStreamResponse);
BatchCommitWriteStreamsResponse commitResponse = mock(BatchCommitWriteStreamsResponse.class);
when(commitResponse.hasCommitTime()).thenReturn(true);
when(writeClient.batchCommitWriteStreams(isA(BatchCommitWriteStreamsRequest.class))).thenReturn(commitResponse);
TableSchema myTableSchema = mockTableSchema(FIELD_1_NAME, TableFieldSchema.Type.STRING, FIELD_2_NAME, TableFieldSchema.Type.STRING);
when(writeStream.getTableSchema()).thenReturn(myTableSchema);
when(writeStream.getName()).thenReturn(streamName);
when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
.thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));
runner.setProperty(PutBigQuery.TRANSFER_TYPE, BATCH_TYPE);
runner.assertValid();
runner.enqueue(csvContentWithLines(1));
runner.run();
verify(writeClient).finalizeWriteStream(streamName);
verify(writeClient).batchCommitWriteStreams(batchCommitRequestCaptor.capture());
BatchCommitWriteStreamsRequest batchCommitRequest = batchCommitRequestCaptor.getValue();
assertEquals(streamName, batchCommitRequest.getWriteStreams(0));
verify(streamWriter).append(any(ProtoRows.class), anyLong());
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS);
}
@Test
void testUnknownColumnSkipped() {
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
TableSchema myTableSchema = mockTableSchema(FIELD_1_NAME, TableFieldSchema.Type.STRING, FIELD_2_NAME, TableFieldSchema.Type.STRING);
when(writeStream.getTableSchema()).thenReturn(myTableSchema);
when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
.thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));
String unknownProperty = "myUnknownProperty";
runner.enqueue(CSV_HEADER + ",unknownProperty\nmyId,myValue," + unknownProperty);
runner.run();
verify(streamWriter).append(protoRowsCaptor.capture(), offsetCaptor.capture());
ProtoRows rows = protoRowsCaptor.getValue();
assertFalse(rows.getSerializedRowsList().get(0).toString().contains(unknownProperty));
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS);
}
@Test
void testSchema() throws Exception {
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
TableSchema myTableSchema = mockTableSchema(FIELD_1_NAME, TableFieldSchema.Type.INT64, FIELD_2_NAME, TableFieldSchema.Type.STRING);
when(writeStream.getTableSchema()).thenReturn(myTableSchema);
when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
.thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));
decorateWithRecordReaderWithSchema(runner);
runner.enqueue(csvContentWithLines(1));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS);
}
@Test
void testSchemaTypeIncompatibility() throws Exception {
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
TableSchema myTableSchema = mockTableSchema(FIELD_1_NAME, TableFieldSchema.Type.STRING, FIELD_2_NAME, TableFieldSchema.Type.STRING);
when(writeStream.getTableSchema()).thenReturn(myTableSchema);
decorateWithRecordReaderWithSchema(runner);
runner.enqueue(csvContentWithLines(1));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_FAILURE);
}
@Test
void testStreamWriterNotInitialized() throws Exception {
AbstractBigQueryProcessor processor = new PutBigQuery() {
@Override
protected BigQuery getCloudService() {
return bq;
}
@Override
protected BigQuery getCloudService(final ProcessContext context) {
return bq;
}
@Override
protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials) throws IOException {
throw new IOException();
}
@Override
protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials) {
return writeClient;
}
};
runner = buildNewRunner(processor);
decorateWithRecordReader(runner);
addRequiredPropertiesToRunner(runner);
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
when(writeStream.getTableSchema()).thenReturn(mock(TableSchema.class));
runner.enqueue(csvContentWithLines(1));
runner.run();
runner.assertQueueNotEmpty();
runner.assertTransferCount(PutBigQuery.REL_FAILURE, 0);
runner.assertTransferCount(PutBigQuery.REL_SUCCESS, 0);
}
private void decorateWithRecordReader(TestRunner runner) throws InitializationException {
CSVReader csvReader = new CSVReader();
runner.addControllerService("csvReader", csvReader);
runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true");
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "csv-header-derived");
runner.enableControllerService(csvReader);
}
private void decorateWithRecordReaderWithSchema(TestRunner runner) throws InitializationException {
String recordReaderSchema = "{\n" +
" \"name\": \"recordFormatName\",\n" +
" \"namespace\": \"nifi.examples\",\n" +
" \"type\": \"record\",\n" +
" \"fields\": [\n" +
" { \"name\": \"" + FIELD_1_NAME + "\", \"type\": \"long\" },\n" +
" { \"name\": \"" + FIELD_2_NAME + "\", \"type\": \"string\" }\n" +
" ]\n" +
"}";
CSVReader csvReader = new CSVReader();
runner.addControllerService("csvReader", csvReader);
runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true");
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_TEXT, recordReaderSchema);
runner.enableControllerService(csvReader);
}
private TableSchema mockTableSchema(String name1, TableFieldSchema.Type type1, String name2, TableFieldSchema.Type type2) {
TableSchema myTableSchema = mock(TableSchema.class);
TableFieldSchema tableFieldSchemaId = mock(TableFieldSchema.class);
when(tableFieldSchemaId.getMode()).thenReturn(TableFieldSchema.Mode.NULLABLE);
when(tableFieldSchemaId.getType()).thenReturn(type1);
when(tableFieldSchemaId.getName()).thenReturn(name1);
TableFieldSchema tableFieldSchemaValue = mock(TableFieldSchema.class);
when(tableFieldSchemaValue.getMode()).thenReturn(TableFieldSchema.Mode.NULLABLE);
when(tableFieldSchemaValue.getType()).thenReturn(type2);
when(tableFieldSchemaValue.getName()).thenReturn(name2);
when(myTableSchema.getFieldsList()).thenReturn(Arrays.asList(tableFieldSchemaId, tableFieldSchemaValue));
return myTableSchema;
}
private String csvContentWithLines(int lineNum) {
StringBuilder builder = new StringBuilder();
builder.append(CSV_HEADER);
IntStream.range(0, lineNum).forEach(x -> {
builder.append("\n");
builder.append(x);
builder.append(",");
builder.append(VALUE_PREFIX).append(x);
});
return builder.toString();
}
}

View File

@ -20,7 +20,7 @@
"birth": {
"date": "07/18/2021",
"time": "12:35:24",
"full": "07-18-2021 12:35:24"
"full": "07-18-2021 12:35:24 UTC"
}
},
{
@ -40,7 +40,7 @@
"birth": {
"date": "01/01/1992",
"time": "00:00:00",
"full": "01-01-1992 00:00:00"
"full": "01-01-1992 00:00:00 UTC"
}
}
]