diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml index ae624dba29..84e84417ea 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml @@ -198,6 +198,12 @@ src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker src/test/resources/mock-gcp-service-account.json src/test/resources/mock-gcp-application-default-credentials.json + src/test/resources/bigquery/avrodecimal.avsc + src/test/resources/bigquery/avrodecimal.avro + src/test/resources/bigquery/avrofloat.avsc + src/test/resources/bigquery/avrofloat.avro + src/test/resources/bigquery/avroint.avsc + src/test/resources/bigquery/avroint.avro src/test/resources/bigquery/streaming-bad-data.json src/test/resources/bigquery/streaming-correct-data.json src/test/resources/bigquery/schema-correct-data-with-date.avsc diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java index 057d6d43fe..0b17f0d939 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java @@ -40,6 +40,7 @@ import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter; import com.google.cloud.bigquery.storage.v1.StorageError; import com.google.cloud.bigquery.storage.v1.StreamWriter; import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.cloud.bigquery.storage.v1.stub.BigQueryWriteStubSettings; import com.google.protobuf.Descriptors; @@ -220,9 +221,11 @@ public class PutBigQuery extends AbstractBigQueryProcessor { WriteStream writeStream; Descriptors.Descriptor protoDescriptor; + TableSchema tableSchema; try { writeStream = createWriteStream(tableName); - protoDescriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(writeStream.getTableSchema()); + tableSchema = writeStream.getTableSchema(); + protoDescriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(tableSchema); streamWriter = createStreamWriter(writeStream.getName(), protoDescriptor, getGoogleCredentials(context)); } catch (Descriptors.DescriptorValidationException | IOException e) { getLogger().error("Failed to create Big Query Stream Writer for writing", e); @@ -238,7 +241,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor { try { try (InputStream in = session.read(flowFile); RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) { - recordNumWritten = writeRecordsToStream(reader, protoDescriptor, skipInvalidRows); + recordNumWritten = writeRecordsToStream(reader, protoDescriptor, skipInvalidRows, tableSchema); } flowFile = session.putAttribute(flowFile, BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(recordNumWritten)); } catch (Exception e) { @@ -248,13 +251,13 @@ public class PutBigQuery extends AbstractBigQueryProcessor { } } - private int writeRecordsToStream(RecordReader reader, Descriptors.Descriptor descriptor, boolean skipInvalidRows) throws Exception { + private int writeRecordsToStream(RecordReader reader, Descriptors.Descriptor descriptor, boolean skipInvalidRows, TableSchema tableSchema) throws Exception { Record currentRecord; int offset = 0; int recordNum = 0; ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); while ((currentRecord = reader.nextRecord()) != null) { - DynamicMessage message = recordToProtoMessage(currentRecord, descriptor, skipInvalidRows); + DynamicMessage message = recordToProtoMessage(currentRecord, descriptor, skipInvalidRows, tableSchema); if (message == null) { continue; @@ -276,11 +279,11 @@ public class PutBigQuery extends AbstractBigQueryProcessor { return recordNum; } - private DynamicMessage recordToProtoMessage(Record record, Descriptors.Descriptor descriptor, boolean skipInvalidRows) { + private DynamicMessage recordToProtoMessage(Record record, Descriptors.Descriptor descriptor, boolean skipInvalidRows, TableSchema tableSchema) { Map valueMap = convertMapRecord(record.toMap()); DynamicMessage message = null; try { - message = ProtoUtils.createMessage(descriptor, valueMap); + message = ProtoUtils.createMessage(descriptor, valueMap, tableSchema); } catch (RuntimeException e) { getLogger().error("Cannot convert record to message", e); if (!skipInvalidRows) { diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java index e275933a12..823e0e11d0 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java @@ -17,50 +17,98 @@ package org.apache.nifi.processors.gcp.bigquery.proto; +import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder; +import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type; +import com.google.cloud.bigquery.storage.v1.TableSchema; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; + +import java.math.BigDecimal; import java.util.Arrays; import java.util.Collection; import java.util.Map; /** - * Util class for protocol buffer messaging - */ +* Util class for protocol buffer messaging +*/ public class ProtoUtils { - public static DynamicMessage createMessage(Descriptors.Descriptor descriptor, Map valueMap) { - DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); + public static DynamicMessage createMessage(Descriptors.Descriptor descriptor, Map valueMap, TableSchema tableSchema) { + final DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); - for (Descriptors.FieldDescriptor field : descriptor.getFields()) { - String name = field.getName(); - Object value = valueMap.get(name); - if (value == null) { - continue; - } + for (final Descriptors.FieldDescriptor field : descriptor.getFields()) { + final String name = field.getName(); + Object value = valueMap.get(name); + if (value == null) { + continue; + } - if (Descriptors.FieldDescriptor.Type.MESSAGE.equals(field.getType())) { - if (field.isRepeated()) { - Collection collection = value.getClass().isArray() ? Arrays.asList((Object[]) value) : (Collection) value; - collection.forEach(act -> builder.addRepeatedField(field, createMessage(field.getMessageType(), (Map) act))); - } else { - builder.setField(field, createMessage(field.getMessageType(), (Map) value)); - } - } else { - // Integer in the bigquery table schema maps back to INT64 which is considered to be Long on Java side: - // https://developers.google.com/protocol-buffers/docs/proto3 - if (value instanceof Integer && (field.getType() == Descriptors.FieldDescriptor.Type.INT64)) { - value = Long.valueOf((Integer) value); - } + switch (field.getType()) { + case MESSAGE: + if (field.isRepeated()) { + Collection collection = value.getClass().isArray() ? Arrays.asList((Object[]) value) : (Collection) value; + collection.forEach(act -> builder.addRepeatedField(field, createMessage(field.getMessageType(), (Map) act, tableSchema))); + } else { + builder.setField(field, createMessage(field.getMessageType(), (Map) value, tableSchema)); + } + break; - if (field.isRepeated()) { - Collection collection = value.getClass().isArray() ? Arrays.asList((Object[]) value) : (Collection) value; - collection.forEach(act -> builder.addRepeatedField(field, act)); - } else { - builder.setField(field, value); - } - } - } + // INT64 with alias INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT + case INT64: + // Integer in the bigquery table schema maps back to INT64 which is considered to be Long on Java side: + // https://developers.google.com/protocol-buffers/docs/proto3 + if (value instanceof Integer) { + value = Long.valueOf((Integer) value); + } - return builder.build(); - } -} + setField(value, field, builder); + break; + + // FLOAT64 + case DOUBLE: + if (value instanceof Float) { + value = Double.valueOf(value.toString()); + } + setField(value, field, builder); + break; + + // matches NUMERIC and BIGNUMERIC types in BigQuery + // BQTableSchemaToProtoDescriptor.class + case BYTES: + if (value instanceof Integer) { + value = new BigDecimal((int) value); + } else if (value instanceof Long) { + value = new BigDecimal((long) value); + } else if (value instanceof Float || value instanceof Double) { + value = new BigDecimal(value.toString()); + } + + if (value instanceof BigDecimal) { + if (tableSchema.getFields(field.getIndex()).getType().equals(Type.BIGNUMERIC)) { + value = BigDecimalByteStringEncoder.encodeToBigNumericByteString((BigDecimal) value); + } else if (tableSchema.getFields(field.getIndex()).getType().equals(Type.NUMERIC)) { + value = BigDecimalByteStringEncoder.encodeToNumericByteString((BigDecimal) value); + } + } + + setField(value, field, builder); + break; + + default: + setField(value, field, builder); + break; + } + } + + return builder.build(); + } + + private static void setField(final Object value, final Descriptors.FieldDescriptor field, final DynamicMessage.Builder builder) { + if (field.isRepeated()) { + Collection collection = value.getClass().isArray() ? Arrays.asList((Object[]) value) : (Collection) value; + collection.forEach(act -> builder.addRepeatedField(field, act)); + } else { + builder.setField(field, value); + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java index c25cd80b6e..2f31f40c2a 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryIT.java @@ -21,6 +21,7 @@ import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldValueList; import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; @@ -29,6 +30,8 @@ import com.google.cloud.bigquery.TableResult; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; + +import org.apache.nifi.avro.AvroReader; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.processors.gcp.AbstractGCPProcessor; @@ -227,6 +230,129 @@ public class PutBigQueryIT extends AbstractBigQueryIT { runner.getFlowFilesForRelationship(PutBigQuery.REL_SUCCESS).get(0).assertAttributeEquals(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(recordCount)); } + @Test + public void testAvroDecimalType() throws InitializationException, IOException { + String tableName = UUID.randomUUID().toString(); + TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName); + Field avrodecimal = Field.newBuilder("avrodecimal", StandardSQLTypeName.BIGNUMERIC).setMode(Field.Mode.NULLABLE).build(); + + // Table schema definition + schema = Schema.of(avrodecimal); + TableDefinition tableDefinition = StandardTableDefinition.of(schema); + TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build(); + + // create table + bigquery.create(tableInfo); + + runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset()); + runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName); + runner.setProperty(PutBigQuery.TRANSFER_TYPE, BATCH_TYPE); + + AvroReader reader = new AvroReader(); + runner.addControllerService("reader", reader); + + final String recordSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/avrodecimal.avsc"))); + runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema); + + runner.enableControllerService(reader); + runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader"); + + runner.enqueue(Paths.get("src/test/resources/bigquery/avrodecimal.avro")); + + runner.run(); + runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS, 1); + + TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema); + Iterator iterator = result.getValues().iterator(); + FieldValueList firstElt = iterator.next(); + assertEquals(firstElt.get(0).getNumericValue().intValue(), 0); + + deleteTable(tableName); + } + + @Test + public void testAvroFloatType() throws InitializationException, IOException { + String tableName = UUID.randomUUID().toString(); + TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName); + Field avrofloat = Field.newBuilder("avrofloat", StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build(); + + // Table schema definition + schema = Schema.of(avrofloat); + TableDefinition tableDefinition = StandardTableDefinition.of(schema); + TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build(); + + // create table + bigquery.create(tableInfo); + + runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset()); + runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName); + runner.setProperty(PutBigQuery.TRANSFER_TYPE, BATCH_TYPE); + + AvroReader reader = new AvroReader(); + runner.addControllerService("reader", reader); + + final String recordSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/avrofloat.avsc"))); + runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema); + + runner.enableControllerService(reader); + runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader"); + + runner.enqueue(Paths.get("src/test/resources/bigquery/avrofloat.avro")); + + runner.run(); + runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS, 1); + + TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema); + Iterator iterator = result.getValues().iterator(); + FieldValueList firstElt = iterator.next(); + assertEquals(firstElt.get(0).getDoubleValue(), 1.0); + + deleteTable(tableName); + } + + @Test + public void testAvroIntType() throws InitializationException, IOException { + String tableName = UUID.randomUUID().toString(); + TableId tableId = TableId.of(dataset.getDatasetId().getDataset(), tableName); + Field avrofloat = Field.newBuilder("avroint", StandardSQLTypeName.INT64).setMode(Field.Mode.NULLABLE).build(); + + // Table schema definition + schema = Schema.of(avrofloat); + TableDefinition tableDefinition = StandardTableDefinition.of(schema); + TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build(); + + // create table + bigquery.create(tableInfo); + + runner.setProperty(BigQueryAttributes.DATASET_ATTR, dataset.getDatasetId().getDataset()); + runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, tableName); + runner.setProperty(PutBigQuery.TRANSFER_TYPE, BATCH_TYPE); + + AvroReader reader = new AvroReader(); + runner.addControllerService("reader", reader); + + final String recordSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/bigquery/avroint.avsc"))); + runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, recordSchema); + + runner.enableControllerService(reader); + runner.setProperty(BigQueryAttributes.RECORD_READER_ATTR, "reader"); + + runner.enqueue(Paths.get("src/test/resources/bigquery/avroint.avro")); + + runner.run(); + runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS, 1); + + TableResult result = bigquery.listTableData(dataset.getDatasetId().getDataset(), tableName, schema); + Iterator iterator = result.getValues().iterator(); + FieldValueList firstElt = iterator.next(); + assertEquals(firstElt.get(0).getDoubleValue(), 1.0); + + deleteTable(tableName); + } + private String prepareTable(AllowableValue transferType) { String tableName = UUID.randomUUID().toString(); @@ -284,8 +410,12 @@ public class PutBigQueryIT extends AbstractBigQueryIT { Field full = Field.newBuilder("full", LegacySQLTypeName.TIMESTAMP).setMode(Field.Mode.NULLABLE).build(); Field birth = Field.newBuilder("birth", LegacySQLTypeName.RECORD, date, time, full).setMode(Field.Mode.NULLABLE).build(); + Field numeric = Field.newBuilder("numeric", StandardSQLTypeName.NUMERIC).setMode(Field.Mode.NULLABLE).build(); + Field floatc = Field.newBuilder("floatc", StandardSQLTypeName.FLOAT64).setMode(Field.Mode.NULLABLE).build(); + Field json = Field.newBuilder("json", StandardSQLTypeName.JSON).setMode(Field.Mode.NULLABLE).build(); + // Table schema definition - schema = Schema.of(id, name, alias, addresses, job, birth); + schema = Schema.of(id, name, alias, addresses, job, birth, numeric, floatc, json); TableDefinition tableDefinition = StandardTableDefinition.of(schema); TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build(); diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrodecimal.avro b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrodecimal.avro new file mode 100644 index 0000000000..f4cb65349b Binary files /dev/null and b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrodecimal.avro differ diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrodecimal.avsc b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrodecimal.avsc new file mode 100644 index 0000000000..f8d9356462 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrodecimal.avsc @@ -0,0 +1,19 @@ +{ + "type": "record", + "name": "nifiRecord", + "namespace": "org.apache.nifi", + "fields": [ + { + "name": "avrodecimal", + "type": [ + { + "type": "bytes", + "logicalType": "decimal", + "precision": 10, + "scale": 0 + }, + "null" + ] + } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrofloat.avro b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrofloat.avro new file mode 100644 index 0000000000..64cca75166 Binary files /dev/null and b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrofloat.avro differ diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrofloat.avsc b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrofloat.avsc new file mode 100644 index 0000000000..a946de0213 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avrofloat.avsc @@ -0,0 +1,14 @@ +{ + "type": "record", + "name": "nifiRecord", + "namespace": "org.apache.nifi", + "fields": [ + { + "name": "avrofloat", + "type": [ + "float", + "null" + ] + } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avroint.avro b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avroint.avro new file mode 100644 index 0000000000..8835708590 Binary files /dev/null and b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avroint.avro differ diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avroint.avsc b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avroint.avsc new file mode 100644 index 0000000000..bc9addeb10 --- /dev/null +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/avroint.avsc @@ -0,0 +1,14 @@ +{ + "type": "record", + "name": "nifiRecord", + "namespace": "org.apache.nifi", + "fields": [ + { + "name": "avroint", + "type": [ + "int", + "null" + ] + } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc index f27f5edfba..8ea7012508 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/schema-correct-data-with-date.avsc @@ -87,6 +87,18 @@ } ] } + }, + { + "name": "numeric", + "type": ["null", "long"] + }, + { + "name": "floatc", + "type": ["null", "double"] + }, + { + "name": "json", + "type": ["null", "string"] } ] } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json index 3fa425a8e1..b6ff43e4e9 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json @@ -21,7 +21,10 @@ "date": "07/18/2021", "time": "12:35:24", "full": "07-18-2021 12:35:24 UTC" - } + }, + "numeric": 0, + "floatc": 0.1, + "json": "{\"key\":\"value\"}" }, { "id": 2, @@ -43,4 +46,4 @@ "full": "01-01-1992 00:00:00 UTC" } } -] \ No newline at end of file +]