NIFI-11402 - PutBigQuery fix for case sensitivity and error handling

This closes #7140.

Signed-off-by: Csaba Bejan <bejan.csaba@gmail.com>
This commit is contained in:
Pierre Villard 2023-04-08 12:15:42 +03:00 committed by Csaba Bejan
parent 3df8c9d2ac
commit bdcd4fcfda
No known key found for this signature in database
GPG Key ID: C59951609F8BDDEB
2 changed files with 31 additions and 7 deletions

View File

@ -218,18 +218,16 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
return;
}
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
int recordNumWritten;
try {
try (InputStream in = session.read(flowFile)) {
RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
try (RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
recordNumWritten = writeRecordsToStream(reader, protoDescriptor);
}
try (InputStream in = session.read(flowFile);
RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
recordNumWritten = writeRecordsToStream(reader, protoDescriptor);
}
flowFile = session.putAttribute(flowFile, BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(recordNumWritten));
} catch (Exception e) {
getLogger().error("Writing Records failed", e);
error.set(e);
} finally {
finishProcessing(session, flowFile, streamWriter, writeStream.getName(), tableName.toString());
@ -303,6 +301,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
flowFile = session.putAttribute(flowFile, BigQueryAttributes.JOB_NB_RECORDS_ATTR, isBatch() ? "0" : String.valueOf(appendSuccessCount.get() * recordBatchCount));
session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
error.set(null); // set error to null for next execution
} else {
if (isBatch()) {
writeClient.finalizeWriteStream(streamName);
@ -434,6 +433,9 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
Map<String, Object> result = new HashMap<>();
for (String key : map.keySet()) {
Object obj = map.get(key);
// BigQuery is not case sensitive on the column names but the protobuf message
// expect all column names to be lower case
key = key.toLowerCase();
if (obj instanceof MapRecord) {
result.put(key, convertMapRecord(((MapRecord) obj).toMap()));
} else if (obj instanceof Object[]

View File

@ -385,6 +385,28 @@ public class PutBigQueryTest extends AbstractBQTest {
runner.assertTransferCount(PutBigQuery.REL_SUCCESS, 0);
}
@Test
void testNextFlowFileProcessedWhenIntermittentErrorResolved() {
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
TableSchema myTableSchema = mockTableSchema(FIELD_1_NAME, TableFieldSchema.Type.STRING, FIELD_2_NAME, TableFieldSchema.Type.STRING);
when(writeStream.getTableSchema()).thenReturn(myTableSchema);
when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
.thenReturn(ApiFutures.immediateFailedFuture(new StatusRuntimeException(Status.INTERNAL)))
.thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));
runner.setProperty(PutBigQuery.RETRY_COUNT, "0");
runner.enqueue(csvContentWithLines(1));
runner.enqueue(csvContentWithLines(1));
runner.run(2);
verify(streamWriter, times(2)).append(any(ProtoRows.class), anyLong());
runner.assertQueueEmpty();
runner.assertTransferCount(PutBigQuery.REL_FAILURE, 1);
runner.assertTransferCount(PutBigQuery.REL_SUCCESS, 1);
}
private void decorateWithRecordReader(TestRunner runner) throws InitializationException {
CSVReader csvReader = new CSVReader();
runner.addControllerService("csvReader", csvReader);