NIFI-3918: Added Choice mapping to JsonTreeRowRecordReader.

This commit is contained in:
Koji Kawamura 2017-05-17 14:23:42 +09:00 committed by Mark Payne
parent 33dc3e36fb
commit f019d509f3
2 changed files with 34 additions and 0 deletions

View File

@ -186,6 +186,9 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
return null; return null;
} }
} }
case CHOICE: {
return DataTypeUtils.convertType(getRawNodeValue(fieldNode), desiredType, fieldName);
}
} }
return null; return null;

View File

@ -44,6 +44,7 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -183,6 +184,36 @@ public class TestJsonTreeRowRecordReader {
} }
} }
@Test
public void testSingleJsonElementWithChoiceFields() throws IOException, MalformedRecordException {
// Wraps default fields by Choice data type to test mapping to a Choice type.
final List<RecordField> choiceFields = getDefaultFields().stream()
.map(f -> new RecordField(f.getFieldName(), RecordFieldType.CHOICE.getChoiceDataType(f.getDataType()))).collect(Collectors.toList());
final RecordSchema schema = new SimpleRecordSchema(choiceFields);
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
assertEquals(expectedFieldNames, fieldNames);
final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
final List<RecordField> fields = schema.getFields();
for (int i = 0; i < schema.getFields().size(); i++) {
assertTrue(fields.get(i).getDataType() instanceof ChoiceDataType);
final ChoiceDataType choiceDataType = (ChoiceDataType) fields.get(i).getDataType();
assertEquals(expectedTypes.get(i), choiceDataType.getPossibleSubTypes().get(0).getFieldType());
}
final Object[] firstRecordValues = reader.nextRecord().getValues();
Assert.assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
assertNull(reader.nextRecord());
}
}
@Test @Test
public void testElementWithNestedData() throws IOException, MalformedRecordException { public void testElementWithNestedData() throws IOException, MalformedRecordException {
final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema()); final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());