diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java index edcfcfd14b..54105d9592 100644 --- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java +++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java @@ -115,13 +115,11 @@ public class PutBigQuery extends AbstractBigQueryProcessor { private final AtomicReference error = new AtomicReference<>(); private final AtomicInteger appendSuccessCount = new AtomicInteger(0); private final Phaser inflightRequestCount = new Phaser(1); - private TableName tableName = null; private BigQueryWriteClient writeClient = null; private StreamWriter streamWriter = null; private String transferType; private int maxRetryCount; private int recordBatchCount; - private boolean skipInvalidRows; public static final PropertyDescriptor PROJECT_ID = new PropertyDescriptor.Builder() .fromPropertyDescriptor(AbstractBigQueryProcessor.PROJECT_ID) @@ -185,12 +183,9 @@ public class PutBigQuery extends AbstractBigQueryProcessor { @OnScheduled public void onScheduled(ProcessContext context) { super.onScheduled(context); - transferType = context.getProperty(TRANSFER_TYPE).getValue(); maxRetryCount = context.getProperty(RETRY_COUNT).asInteger(); - skipInvalidRows = context.getProperty(SKIP_INVALID_ROWS).asBoolean(); recordBatchCount = context.getProperty(APPEND_RECORD_COUNT).asInteger(); - tableName = TableName.of(context.getProperty(PROJECT_ID).getValue(), context.getProperty(DATASET).getValue(), context.getProperty(TABLE_NAME).getValue()); writeClient = createWriteClient(getGoogleCredentials(context)); } @@ -201,30 +196,37 @@ public class PutBigQuery extends AbstractBigQueryProcessor { @Override public void onTrigger(ProcessContext context, ProcessSession session) { - WriteStream writeStream; - Descriptors.Descriptor protoDescriptor; - try { - writeStream = createWriteStream(); - protoDescriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(writeStream.getTableSchema()); - streamWriter = createStreamWriter(writeStream.getName(), protoDescriptor, getGoogleCredentials(context)); - } catch (Descriptors.DescriptorValidationException | IOException e) { - getLogger().error("Failed to create Big Query Stream Writer for writing", e); - context.yield(); - return; - } - FlowFile flowFile = session.get(); if (flowFile == null) { return; } + final String projectId = context.getProperty(PROJECT_ID).getValue(); + final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue(); + final String dataTableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final TableName tableName = TableName.of(projectId, dataset, dataTableName); + + WriteStream writeStream; + Descriptors.Descriptor protoDescriptor; + try { + writeStream = createWriteStream(tableName); + protoDescriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(writeStream.getTableSchema()); + streamWriter = createStreamWriter(writeStream.getName(), protoDescriptor, getGoogleCredentials(context)); + } catch (Descriptors.DescriptorValidationException | IOException e) { + getLogger().error("Failed to create Big Query Stream Writer for writing", e); + context.yield(); + session.rollback(); + return; + } + + final boolean skipInvalidRows = context.getProperty(SKIP_INVALID_ROWS).evaluateAttributeExpressions(flowFile).asBoolean(); final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); int recordNumWritten; try { try (InputStream in = session.read(flowFile); RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) { - recordNumWritten = writeRecordsToStream(reader, protoDescriptor); + recordNumWritten = writeRecordsToStream(reader, protoDescriptor, skipInvalidRows); } flowFile = session.putAttribute(flowFile, BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(recordNumWritten)); } catch (Exception e) { @@ -234,13 +236,13 @@ public class PutBigQuery extends AbstractBigQueryProcessor { } } - private int writeRecordsToStream(RecordReader reader, Descriptors.Descriptor descriptor) throws Exception { + private int writeRecordsToStream(RecordReader reader, Descriptors.Descriptor descriptor, boolean skipInvalidRows) throws Exception { Record currentRecord; int offset = 0; int recordNum = 0; ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); while ((currentRecord = reader.nextRecord()) != null) { - DynamicMessage message = recordToProtoMessage(currentRecord, descriptor); + DynamicMessage message = recordToProtoMessage(currentRecord, descriptor, skipInvalidRows); if (message == null) { continue; @@ -262,7 +264,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor { return recordNum; } - private DynamicMessage recordToProtoMessage(Record record, Descriptors.Descriptor descriptor) { + private DynamicMessage recordToProtoMessage(Record record, Descriptors.Descriptor descriptor, boolean skipInvalidRows) { Map valueMap = convertMapRecord(record.toMap()); DynamicMessage message = null; try { @@ -369,7 +371,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor { } } - private WriteStream createWriteStream() { + private WriteStream createWriteStream(TableName tableName) { WriteStream.Type type = isBatch() ? WriteStream.Type.PENDING : WriteStream.Type.COMMITTED; CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder() .setParent(tableName.toString())