NIFI-13478: Protobuf Reader fails to coerce type of repeated fields

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #9024
This commit is contained in:
Mark Bathori 2024-07-02 11:26:19 +02:00 committed by Matt Burgess
parent 715989d48c
commit fd6e62bdd2
2 changed files with 47 additions and 7 deletions

View File

@ -328,7 +328,14 @@ public class ProtobufDataConverter {
if (coerceTypes) {
final Optional<RecordField> recordField = rootRecordSchema.getField(protoField.getFieldName());
if (recordField.isPresent()) {
resultValues = resultValues.stream().map(value -> DataTypeUtils.convertType(value, recordField.get().getDataType(), recordField.get().getFieldName())).toList();
final DataType dataType;
if (protoField.isRepeatable()) {
final ArrayDataType arrayDataType = (ArrayDataType) recordField.get().getDataType();
dataType = arrayDataType.getElementType();
} else {
dataType = recordField.get().getDataType();
}
resultValues = resultValues.stream().map(value -> DataTypeUtils.convertType(value, dataType, recordField.get().getFieldName())).toList();
}
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.services.protobuf;
import com.google.protobuf.Descriptors;
import com.squareup.wire.schema.Schema;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
@ -28,11 +29,14 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.nifi.services.protobuf.ProtoTestUtil.generateInputDataForProto3;
import static org.apache.nifi.services.protobuf.ProtoTestUtil.generateInputDataForRepeatedProto3;
import static org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto3TestSchema;
import static org.apache.nifi.services.protobuf.ProtoTestUtil.loadRepeatedProto3TestSchema;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -131,12 +135,41 @@ public class TestProtobufRecordReader {
assertNull(field4);
}
@Test
public void testReadRecordWithRepeatedFieldsAndCoerceTypeAndDropUnknownFields() throws Descriptors.DescriptorValidationException, IOException {
final ProtobufRecordReader reader = createReader(generateInputDataForRepeatedProto3(), "RootMessage", loadRepeatedProto3TestSchema(), generateRecordSchemaForRepeatedTest());
final Record record = reader.nextRecord(true, true);
final Object[] recordList = (Object[]) record.getValue("repeatedMessage");
assertInstanceOf(Object[].class, recordList);
final MapRecord firstRecord = (MapRecord) recordList[0];
final Object[] field1 = (Object[]) firstRecord.getValue("booleanField");
assertArrayEquals(new Boolean[]{true, false}, field1);
final Object[] field2 = (Object[]) firstRecord.getValue("stringField");
assertArrayEquals(new String[]{"Test text1", "Test text2"}, field2);
final Object field4 = firstRecord.getValue("int32Field");
assertNull(field4);
}
private RecordSchema generateRecordSchema() {
final List<RecordField> fields = new ArrayList<>();
for (final String fieldName : new String[] {"booleanField", "stringField", "int32Field"}) {
fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
}
return new SimpleRecordSchema(fields);
return new SimpleRecordSchema(Arrays.asList(
new RecordField("booleanField", RecordFieldType.STRING.getDataType()),
new RecordField("stringField", RecordFieldType.STRING.getDataType()),
new RecordField("int32Field", RecordFieldType.STRING.getDataType()))
);
}
private RecordSchema generateRecordSchemaForRepeatedTest() {
return new SimpleRecordSchema(List.of(
new RecordField("repeatedMessage", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList(
new RecordField("booleanField", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BOOLEAN.getDataType())),
new RecordField("stringField", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()))
)))))
));
}
private ProtobufRecordReader createReader(InputStream in, String message, Schema schema, RecordSchema recordSchema) {