NIFI-13630 Handle Map Avro Type in PutBigQuery

This closes #9151

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Juldrixx 2024-08-06 15:49:37 +02:00 committed by exceptionfactory
parent d4344a3140
commit 9fbe6aab74
No known key found for this signature in database
2 changed files with 96 additions and 2 deletions

View File

@ -46,8 +46,20 @@ public class ProtoUtils {
switch (field.getType()) {
case MESSAGE:
if (field.isRepeated()) {
Collection collection = value.getClass().isArray() ? Arrays.asList((Object[]) value) : (Collection) value;
collection.forEach(act -> builder.addRepeatedField(field, createMessage(field.getMessageType(), (Map<String, Object>) act, tableSchema)));
final Collection<Map<String, Object>> valueMaps;
if (value instanceof Object[] arrayValue) {
valueMaps = Arrays.stream(arrayValue)
.map(item -> (Map<String, Object>) item).toList();
} else if (value instanceof Map<?, ?> mapValue) {
valueMaps = mapValue.entrySet().stream()
.map(entry -> Map.of(
"key", entry.getKey(),
"value", entry.getValue()
)).toList();
} else {
valueMaps = (Collection<Map<String, Object>>) value;
}
valueMaps.forEach(act -> builder.addRepeatedField(field, createMessage(field.getMessageType(), act, tableSchema)));
} else {
builder.setField(field, createMessage(field.getMessageType(), (Map<String, Object>) value, tableSchema));
}

View File

@ -44,6 +44,7 @@ import java.util.stream.Stream;
import org.apache.nifi.csv.CSVReader;
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
@ -457,6 +458,26 @@ public class PutBigQueryTest {
runner.assertTransferCount(PutBigQuery.REL_SUCCESS, 1);
}
@Test
void testMapFieldSchema() throws Exception {
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
TableSchema myTableSchema = mockJsonTableSchema();
when(writeStream.getTableSchema()).thenReturn(myTableSchema);
when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
.thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));
decorateWithJsonRecordReaderWithSchema(runner);
runner.setProperty(PutBigQuery.RECORD_READER, "jsonReader");
runner.enqueue(jsonContent());
runner.run();
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS);
}
private void decorateWithRecordReader(TestRunner runner) throws InitializationException {
CSVReader csvReader = new CSVReader();
runner.addControllerService("csvReader", csvReader);
@ -484,6 +505,30 @@ public class PutBigQueryTest {
runner.enableControllerService(csvReader);
}
private void decorateWithJsonRecordReaderWithSchema(TestRunner runner) throws InitializationException {
String recordReaderSchema = """
{
"name": "recordFormatName",
"namespace": "nifi.examples",
"type": "record",
"fields": [
{
"name": "field",
"type": {
"type": "map",
"values": "string"
}
}
]
}""";
JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("jsonReader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, recordReaderSchema);
runner.enableControllerService(jsonReader);
}
private TableSchema mockTableSchema(String name1, TableFieldSchema.Type type1, String name2, TableFieldSchema.Type type2) {
TableSchema myTableSchema = mock(TableSchema.class);
@ -503,6 +548,30 @@ public class PutBigQueryTest {
return myTableSchema;
}
private TableSchema mockJsonTableSchema() {
TableSchema myTableSchema = mock(TableSchema.class);
TableFieldSchema keyFieldSchema = mock(TableFieldSchema.class);
when(keyFieldSchema.getName()).thenReturn("key");
when(keyFieldSchema.getType()).thenReturn(TableFieldSchema.Type.STRING);
when(keyFieldSchema.getMode()).thenReturn(TableFieldSchema.Mode.REQUIRED);
TableFieldSchema valueFieldSchema = mock(TableFieldSchema.class);
when(valueFieldSchema.getName()).thenReturn("value");
when(valueFieldSchema.getType()).thenReturn(TableFieldSchema.Type.STRING);
when(valueFieldSchema.getMode()).thenReturn(TableFieldSchema.Mode.NULLABLE);
TableFieldSchema tableFieldSchemaId = mock(TableFieldSchema.class);
when(tableFieldSchemaId.getName()).thenReturn("field");
when(tableFieldSchemaId.getType()).thenReturn(TableFieldSchema.Type.STRUCT);
when(tableFieldSchemaId.getMode()).thenReturn(TableFieldSchema.Mode.REPEATED);
when(tableFieldSchemaId.getFieldsList()).thenReturn(List.of(keyFieldSchema, valueFieldSchema));
when(myTableSchema.getFieldsList()).thenReturn(List.of(tableFieldSchemaId));
return myTableSchema;
}
private String csvContentWithLines(int lineNum) {
StringBuilder builder = new StringBuilder();
builder.append(CSV_HEADER);
@ -516,4 +585,17 @@ public class PutBigQueryTest {
return builder.toString();
}
private String jsonContent() {
return """
{
"field": {
"FIELD_1": "field_1",
"FIELD_2": "field_2",
"FIELD_3": "field_3",
"FIELD_4": "field_4",
"FIELD_5": "field_5"
}
}""";
}
}