NIFI-11823 - fix NUMERIC support in PutBigQuery

This closes #7489.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Pierre Villard 2023-07-17 16:55:24 +02:00 committed by Peter Turcsanyi
parent 485112e54f
commit b056bf8f7b
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
12 changed files with 292 additions and 43 deletions

View File

@ -198,6 +198,12 @@
<exclude>src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker</exclude>
<exclude>src/test/resources/mock-gcp-service-account.json</exclude>
<exclude>src/test/resources/mock-gcp-application-default-credentials.json</exclude>
<exclude>src/test/resources/bigquery/avrodecimal.avsc</exclude>
<exclude>src/test/resources/bigquery/avrodecimal.avro</exclude>
<exclude>src/test/resources/bigquery/avrofloat.avsc</exclude>
<exclude>src/test/resources/bigquery/avrofloat.avro</exclude>
<exclude>src/test/resources/bigquery/avroint.avsc</exclude>
<exclude>src/test/resources/bigquery/avroint.avro</exclude>
<exclude>src/test/resources/bigquery/streaming-bad-data.json</exclude>
<exclude>src/test/resources/bigquery/streaming-correct-data.json</exclude>
<exclude>src/test/resources/bigquery/schema-correct-data-with-date.avsc</exclude>

View File

@ -40,6 +40,7 @@ import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
import com.google.cloud.bigquery.storage.v1.StorageError;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.stub.BigQueryWriteStubSettings;
import com.google.protobuf.Descriptors;
@ -220,9 +221,11 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
WriteStream writeStream;
Descriptors.Descriptor protoDescriptor;
TableSchema tableSchema;
try {
writeStream = createWriteStream(tableName);
protoDescriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(writeStream.getTableSchema());
tableSchema = writeStream.getTableSchema();
protoDescriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(tableSchema);
streamWriter = createStreamWriter(writeStream.getName(), protoDescriptor, getGoogleCredentials(context));
} catch (Descriptors.DescriptorValidationException | IOException e) {
getLogger().error("Failed to create Big Query Stream Writer for writing", e);
@ -238,7 +241,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
try {
try (InputStream in = session.read(flowFile);
RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
recordNumWritten = writeRecordsToStream(reader, protoDescriptor, skipInvalidRows);
recordNumWritten = writeRecordsToStream(reader, protoDescriptor, skipInvalidRows, tableSchema);
}
flowFile = session.putAttribute(flowFile, BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(recordNumWritten));
} catch (Exception e) {
@ -248,13 +251,13 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
}
}
private int writeRecordsToStream(RecordReader reader, Descriptors.Descriptor descriptor, boolean skipInvalidRows) throws Exception {
private int writeRecordsToStream(RecordReader reader, Descriptors.Descriptor descriptor, boolean skipInvalidRows, TableSchema tableSchema) throws Exception {
Record currentRecord;
int offset = 0;
int recordNum = 0;
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
while ((currentRecord = reader.nextRecord()) != null) {
DynamicMessage message = recordToProtoMessage(currentRecord, descriptor, skipInvalidRows);
DynamicMessage message = recordToProtoMessage(currentRecord, descriptor, skipInvalidRows, tableSchema);
if (message == null) {
continue;
@ -276,11 +279,11 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
return recordNum;
}
private DynamicMessage recordToProtoMessage(Record record, Descriptors.Descriptor descriptor, boolean skipInvalidRows) {
private DynamicMessage recordToProtoMessage(Record record, Descriptors.Descriptor descriptor, boolean skipInvalidRows, TableSchema tableSchema) {
Map<String, Object> valueMap = convertMapRecord(record.toMap());
DynamicMessage message = null;
try {
message = ProtoUtils.createMessage(descriptor, valueMap);
message = ProtoUtils.createMessage(descriptor, valueMap, tableSchema);
} catch (RuntimeException e) {
getLogger().error("Cannot convert record to message", e);
if (!skipInvalidRows) {

View File

@ -17,50 +17,98 @@
package org.apache.nifi.processors.gcp.bigquery.proto;
import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
/**
* Util class for protocol buffer messaging
*/
* Util class for protocol buffer messaging
*/
public class ProtoUtils {
public static DynamicMessage createMessage(Descriptors.Descriptor descriptor, Map<String, Object> valueMap) {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
public static DynamicMessage createMessage(Descriptors.Descriptor descriptor, Map<String, Object> valueMap, TableSchema tableSchema) {
final DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
for (Descriptors.FieldDescriptor field : descriptor.getFields()) {
String name = field.getName();
Object value = valueMap.get(name);
if (value == null) {
continue;
}
for (final Descriptors.FieldDescriptor field : descriptor.getFields()) {
final String name = field.getName();
Object value = valueMap.get(name);
if (value == null) {
continue;
}
if (Descriptors.FieldDescriptor.Type.MESSAGE.equals(field.getType())) {
if (field.isRepeated()) {
Collection collection = value.getClass().isArray() ? Arrays.asList((Object[]) value) : (Collection) value;
collection.forEach(act -> builder.addRepeatedField(field, createMessage(field.getMessageType(), (Map<String, Object>) act)));
} else {
builder.setField(field, createMessage(field.getMessageType(), (Map<String, Object>) value));
}
} else {
// Integer in the bigquery table schema maps back to INT64 which is considered to be Long on Java side:
// https://developers.google.com/protocol-buffers/docs/proto3
if (value instanceof Integer && (field.getType() == Descriptors.FieldDescriptor.Type.INT64)) {
value = Long.valueOf((Integer) value);
}
switch (field.getType()) {
case MESSAGE:
if (field.isRepeated()) {
Collection collection = value.getClass().isArray() ? Arrays.asList((Object[]) value) : (Collection) value;
collection.forEach(act -> builder.addRepeatedField(field, createMessage(field.getMessageType(), (Map<String, Object>) act, tableSchema)));
} else {
builder.setField(field, createMessage(field.getMessageType(), (Map<String, Object>) value, tableSchema));
}
break;
if (field.isRepeated()) {
Collection collection = value.getClass().isArray() ? Arrays.asList((Object[]) value) : (Collection) value;
collection.forEach(act -> builder.addRepeatedField(field, act));
} else {
builder.setField(field, value);
}
}
}
// INT64 with alias INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT
case INT64:
// Integer in the bigquery table schema maps back to INT64 which is considered to be Long on Java side:
// https://developers.google.com/protocol-buffers/docs/proto3
if (value instanceof Integer) {
value = Long.valueOf((Integer) value);
}
return builder.build();
}
}
setField(value, field, builder);
break;
// FLOAT64
case DOUBLE:
if (value instanceof Float) {
value = Double.valueOf(value.toString());
}
setField(value, field, builder);
break;
// matches NUMERIC and BIGNUMERIC types in BigQuery
// BQTableSchemaToProtoDescriptor.class
case BYTES:
if (value instanceof Integer) {
value = new BigDecimal((int) value);
} else if (value instanceof Long) {
value = new BigDecimal((long) value);
} else if (value instanceof Float || value instanceof Double) {
value = new BigDecimal(value.toString());
}
if (value instanceof BigDecimal) {
if (tableSchema.getFields(field.getIndex()).getType().equals(Type.BIGNUMERIC)) {
value = BigDecimalByteStringEncoder.encodeToBigNumericByteString((BigDecimal) value);
} else if (tableSchema.getFields(field.getIndex()).getType().equals(Type.NUMERIC)) {
value = BigDecimalByteStringEncoder.encodeToNumericByteString((BigDecimal) value);
}
}
setField(value, field, builder);
break;
default:
setField(value, field, builder);
break;
}
}
return builder.build();
}
private static void setField(final Object value, final Descriptors.FieldDescriptor field, final DynamicMessage.Builder builder) {
if (field.isRepeated()) {
Collection collection = value.getClass().isArray() ? Arrays.asList((Object[]) value) : (Collection) value;
collection.forEach(act -> builder.addRepeatedField(field, act));
} else {
builder.setField(field, value);
}
}
}

View File

@ -21,6 +21,7 @@ import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
@ -29,6 +30,8 @@ import com.google.cloud.bigquery.TableResult;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import org.apache.nifi.avro.AvroReader;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
@ -227,6 +230,129 @@ public class PutBigQueryIT extends AbstractBigQueryIT {
runner.getFlowFilesForRelationship(PutBigQuery.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(recordCount));
}
@Test
public void testAvroDecimalType() throws InitializationException, IOException {
String tableName = UUID.randomUUID().toString();
TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
Field avrodecimal = Field.newBuilder("avrodecimal", StandardSQLTypeName.BIGNUMERIC).setMode(Field.Mode.NULLABLE).build();
// Table schema definition
schema = Schema.of(avrodecimal);
TableDefinition tableDefinition = StandardTableDefinition.of(schema);
TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
// create table
bigquery.create(tableInfo);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
runner.setProperty(PutBigQuery.TRANSFER_TYPE, BATCH_TYPE);
AvroReader reader = new AvroReader();
runner.addControllerService("reader", reader);
final String recordSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/avrodecimal.avsc")));
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema);
runner.enableControllerService(reader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.enqueue(Paths.get("src/test/resources/bigquery/avrodecimal.avro"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS, 1);
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
Iterator<FieldValueList> iterator = result.getValues().iterator();
FieldValueList firstElt = iterator.next();
assertEquals(firstElt.get(0).getNumericValue().intValue(), 0);
deleteTable(tableName);
}
@Test
public void testAvroFloatType() throws InitializationException, IOException {
String tableName = UUID.randomUUID().toString();
TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
Field avrofloat = Field.newBuilder("avrofloat", StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build();
// Table schema definition
schema = Schema.of(avrofloat);
TableDefinition tableDefinition = StandardTableDefinition.of(schema);
TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
// create table
bigquery.create(tableInfo);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
runner.setProperty(PutBigQuery.TRANSFER_TYPE, BATCH_TYPE);
AvroReader reader = new AvroReader();
runner.addControllerService("reader", reader);
final String recordSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/avrofloat.avsc")));
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema);
runner.enableControllerService(reader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.enqueue(Paths.get("src/test/resources/bigquery/avrofloat.avro"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS, 1);
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
Iterator<FieldValueList> iterator = result.getValues().iterator();
FieldValueList firstElt = iterator.next();
assertEquals(firstElt.get(0).getDoubleValue(), 1.0);
deleteTable(tableName);
}
@Test
public void testAvroIntType() throws InitializationException, IOException {
String tableName = UUID.randomUUID().toString();
TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName);
Field avrofloat = Field.newBuilder("avroint", StandardSQLTypeName.INT64).setMode(Field.Mode.NULLABLE).build();
// Table schema definition
schema = Schema.of(avrofloat);
TableDefinition tableDefinition = StandardTableDefinition.of(schema);
TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
// create table
bigquery.create(tableInfo);
runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset());
runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName);
runner.setProperty(PutBigQuery.TRANSFER_TYPE, BATCH_TYPE);
AvroReader reader = new AvroReader();
runner.addControllerService("reader", reader);
final String recordSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/avroint.avsc")));
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema);
runner.enableControllerService(reader);
runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader");
runner.enqueue(Paths.get("src/test/resources/bigquery/avroint.avro"));
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS, 1);
TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema);
Iterator<FieldValueList> iterator = result.getValues().iterator();
FieldValueList firstElt = iterator.next();
assertEquals(firstElt.get(0).getDoubleValue(), 1.0);
deleteTable(tableName);
}
private String prepareTable(AllowableValue transferType) {
String tableName = UUID.randomUUID().toString();
@ -284,8 +410,12 @@ public class PutBigQueryIT extends AbstractBigQueryIT {
Field full = Field.newBuilder("full", LegacySQLTypeName.TIMESTAMP).setMode(Field.Mode.NULLABLE).build();
Field birth = Field.newBuilder("birth", LegacySQLTypeName.RECORD, date, time, full).setMode(Field.Mode.NULLABLE).build();
Field numeric = Field.newBuilder("numeric", StandardSQLTypeName.NUMERIC).setMode(Field.Mode.NULLABLE).build();
Field floatc = Field.newBuilder("floatc", StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build();
Field json = Field.newBuilder("json", StandardSQLTypeName.JSON).setMode(Field.Mode.NULLABLE).build();
// Table schema definition
schema = Schema.of(id, name, alias, addresses, job, birth);
schema = Schema.of(id, name, alias, addresses, job, birth, numeric, floatc, json);
TableDefinition tableDefinition = StandardTableDefinition.of(schema);
TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();

View File

@ -0,0 +1,19 @@
{
"type": "record",
"name": "nifiRecord",
"namespace": "org.apache.nifi",
"fields": [
{
"name": "avrodecimal",
"type": [
{
"type": "bytes",
"logicalType": "decimal",
"precision": 10,
"scale": 0
},
"null"
]
}
]
}

View File

@ -0,0 +1,14 @@
{
"type": "record",
"name": "nifiRecord",
"namespace": "org.apache.nifi",
"fields": [
{
"name": "avrofloat",
"type": [
"float",
"null"
]
}
]
}

View File

@ -0,0 +1,14 @@
{
"type": "record",
"name": "nifiRecord",
"namespace": "org.apache.nifi",
"fields": [
{
"name": "avroint",
"type": [
"int",
"null"
]
}
]
}

View File

@ -87,6 +87,18 @@
}
]
}
},
{
"name": "numeric",
"type": ["null", "long"]
},
{
"name": "floatc",
"type": ["null", "double"]
},
{
"name": "json",
"type": ["null", "string"]
}
]
}

View File

@ -21,7 +21,10 @@
"date": "07/18/2021",
"time": "12:35:24",
"full": "07-18-2021 12:35:24 UTC"
}
},
"numeric": 0,
"floatc": 0.1,
"json": "{\"key\":\"value\"}"
},
{
"id": 2,
@ -43,4 +46,4 @@
"full": "01-01-1992 00:00:00 UTC"
}
}
]
]