NIFI-11608 Fixed Expression Language Evaluation in PutBigQuery

This closes #7316

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Steven Matison 2023-05-31 07:31:27 -04:00 committed by exceptionfactory
parent 007bf3bcec
commit 645618a609
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
1 changed files with 24 additions and 22 deletions

View File

@ -115,13 +115,11 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
private final AtomicReference<Exception> error = new AtomicReference<>();
private final AtomicInteger appendSuccessCount = new AtomicInteger(0);
private final Phaser inflightRequestCount = new Phaser(1);
private TableName tableName = null;
private BigQueryWriteClient writeClient = null;
private StreamWriter streamWriter = null;
private String transferType;
private int maxRetryCount;
private int recordBatchCount;
private boolean skipInvalidRows;
public static final PropertyDescriptor PROJECT_ID = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractBigQueryProcessor.PROJECT_ID)
@ -185,12 +183,9 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
@OnScheduled
public void onScheduled(ProcessContext context) {
super.onScheduled(context);
transferType = context.getProperty(TRANSFER_TYPE).getValue();
maxRetryCount = context.getProperty(RETRY_COUNT).asInteger();
skipInvalidRows = context.getProperty(SKIP_INVALID_ROWS).asBoolean();
recordBatchCount = context.getProperty(APPEND_RECORD_COUNT).asInteger();
tableName = TableName.of(context.getProperty(PROJECT_ID).getValue(), context.getProperty(DATASET).getValue(), context.getProperty(TABLE_NAME).getValue());
writeClient = createWriteClient(getGoogleCredentials(context));
}
@ -201,30 +196,37 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) {
WriteStream writeStream;
Descriptors.Descriptor protoDescriptor;
try {
writeStream = createWriteStream();
protoDescriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(writeStream.getTableSchema());
streamWriter = createStreamWriter(writeStream.getName(), protoDescriptor, getGoogleCredentials(context));
} catch (Descriptors.DescriptorValidationException | IOException e) {
getLogger().error("Failed to create Big Query Stream Writer for writing", e);
context.yield();
return;
}
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String projectId = context.getProperty(PROJECT_ID).getValue();
final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
final String dataTableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final TableName tableName = TableName.of(projectId, dataset, dataTableName);
WriteStream writeStream;
Descriptors.Descriptor protoDescriptor;
try {
writeStream = createWriteStream(tableName);
protoDescriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(writeStream.getTableSchema());
streamWriter = createStreamWriter(writeStream.getName(), protoDescriptor, getGoogleCredentials(context));
} catch (Descriptors.DescriptorValidationException | IOException e) {
getLogger().error("Failed to create Big Query Stream Writer for writing", e);
context.yield();
session.rollback();
return;
}
final boolean skipInvalidRows = context.getProperty(SKIP_INVALID_ROWS).evaluateAttributeExpressions(flowFile).asBoolean();
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
int recordNumWritten;
try {
try (InputStream in = session.read(flowFile);
RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
recordNumWritten = writeRecordsToStream(reader, protoDescriptor);
recordNumWritten = writeRecordsToStream(reader, protoDescriptor, skipInvalidRows);
}
flowFile = session.putAttribute(flowFile, BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(recordNumWritten));
} catch (Exception e) {
@ -234,13 +236,13 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
}
}
private int writeRecordsToStream(RecordReader reader, Descriptors.Descriptor descriptor) throws Exception {
private int writeRecordsToStream(RecordReader reader, Descriptors.Descriptor descriptor, boolean skipInvalidRows) throws Exception {
Record currentRecord;
int offset = 0;
int recordNum = 0;
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
while ((currentRecord = reader.nextRecord()) != null) {
DynamicMessage message = recordToProtoMessage(currentRecord, descriptor);
DynamicMessage message = recordToProtoMessage(currentRecord, descriptor, skipInvalidRows);
if (message == null) {
continue;
@ -262,7 +264,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
return recordNum;
}
private DynamicMessage recordToProtoMessage(Record record, Descriptors.Descriptor descriptor) {
private DynamicMessage recordToProtoMessage(Record record, Descriptors.Descriptor descriptor, boolean skipInvalidRows) {
Map<String, Object> valueMap = convertMapRecord(record.toMap());
DynamicMessage message = null;
try {
@ -369,7 +371,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
}
}
private WriteStream createWriteStream() {
private WriteStream createWriteStream(TableName tableName) {
WriteStream.Type type = isBatch() ? WriteStream.Type.PENDING : WriteStream.Type.COMMITTED;
CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder()
.setParent(tableName.toString())