NIFI-5943 support conversions from List to Avro ARRAY and from Map to Avro RECORD

NIFI-5943 Added another unit test to verify list + map conversion to list of records. (Mike Thomsen)

This closes #3267

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Alex Savitsky 2019-01-14 08:49:13 -05:00 committed by Mike Thomsen
parent 45b32e3bc1
commit e7ae97797e
2 changed files with 85 additions and 19 deletions

View File

@ -59,6 +59,7 @@ import java.sql.Time;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@ -754,10 +755,20 @@ public class AvroTypeUtil {
case RECORD:
final GenericData.Record avroRecord = new GenericData.Record(fieldSchema);
final Record record = (Record) rawValue;
for (final RecordField recordField : record.getSchema().getFields()) {
final Object recordFieldValue = record.getValue(recordField);
final String recordFieldName = recordField.getFieldName();
final Set<Map.Entry<String, Object>> entries;
if (rawValue instanceof Map) {
final Map<String, Object> map = (Map<String, Object>) rawValue;
entries = map.entrySet();
} else if (rawValue instanceof Record) {
entries = new HashSet<>();
final Record record = (Record) rawValue;
record.getSchema().getFields().forEach(field -> entries.add(new AbstractMap.SimpleEntry<>(field.getFieldName(), record.getValue(field))));
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Record");
}
for (final Map.Entry<String, Object> e : entries) {
final Object recordFieldValue = e.getValue();
final String recordFieldName = e.getKey();
final Field field = fieldSchema.getField(recordFieldName);
if (field == null) {
@ -771,7 +782,14 @@ public class AvroTypeUtil {
case UNION:
return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName, charset), fieldName);
case ARRAY:
final Object[] objectArray = (Object[]) rawValue;
final Object[] objectArray;
if (rawValue instanceof List) {
objectArray = ((List) rawValue).toArray();
} else if (rawValue instanceof Object[]) {
objectArray = (Object[]) rawValue;
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to an Array");
}
final List<Object> list = new ArrayList<>(objectArray.length);
int i = 0;
for (final Object o : objectArray) {

View File

@ -45,11 +45,13 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -455,23 +457,23 @@ public class TestAvroTypeUtil {
@Test
public void testAliasCreatedForInvalidField() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("valid", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("$invalid2", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("3invalid3", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField(" __ Another ONE!!", RecordFieldType.STRING.getDataType()));
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("valid", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("$invalid2", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("3invalid3", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField(" __ Another ONE!!", RecordFieldType.STRING.getDataType()));
final RecordSchema recordSchema = new SimpleRecordSchema(fields);
final RecordSchema recordSchema = new SimpleRecordSchema(fields);
final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
assertNotNull(avroSchema.getField("valid"));
final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
assertNotNull(avroSchema.getField("valid"));
assertNull(avroSchema.getField("$invalid"));
final Field field2 = avroSchema.getField("_invalid2");
assertNotNull(field2);
assertEquals("_invalid2", field2.name());
assertEquals(1, field2.aliases().size());
assertTrue(field2.aliases().contains("$invalid2"));
assertNull(avroSchema.getField("$invalid"));
final Field field2 = avroSchema.getField("_invalid2");
assertNotNull(field2);
assertEquals("_invalid2", field2.name());
assertEquals(1, field2.aliases().size());
assertTrue(field2.aliases().contains("$invalid2"));
assertNull(avroSchema.getField("$invalid3"));
final Field field3 = avroSchema.getField("_invalid3");
@ -486,6 +488,52 @@ public class TestAvroTypeUtil {
assertEquals("_____Another_ONE__", field4.name());
assertEquals(1, field4.aliases().size());
assertTrue(field4.aliases().contains(" __ Another ONE!!"));
}
public void testListToArrayConversion() {
final Charset charset = Charset.forName("UTF-8");
Object o = AvroTypeUtil.convertToAvroObject(Collections.singletonList("Hello"), Schema.createArray(Schema.create(Type.STRING)), charset);
assertTrue(o instanceof List);
assertEquals(1, ((List) o).size());
assertEquals("Hello", ((List) o).get(0));
}
@Test
public void testMapToRecordConversion() {
final Charset charset = Charset.forName("UTF-8");
Object o = AvroTypeUtil.convertToAvroObject(Collections.singletonMap("Hello", "World"),
Schema.createRecord(Collections.singletonList(new Field("Hello", Schema.create(Type.STRING), "", ""))), charset);
assertTrue(o instanceof Record);
assertEquals("World", ((Record) o).get("Hello"));
}
@Test
public void testListAndMapConversion() {
Schema s = Schema.createRecord(Arrays.asList(
new Field("List", Schema.createArray(Schema.createRecord(
Arrays.asList(
new Field("Message", Schema.create(Type.STRING), "", "")
)
)), "", null)
));
Map<String, Object> obj = new HashMap<>();
List<Map<String, Object>> list = new ArrayList<>();
for (int x = 0; x < 10; x++) {
list.add(new HashMap<String, Object>(){{
put("Message", UUID.randomUUID().toString());
}});
}
obj.put("List", list);
Object o = AvroTypeUtil.convertToAvroObject(obj, s);
assertTrue(o instanceof Record);
List innerList = (List)((Record)o).get("List");
assertNotNull( innerList );
assertEquals(10, innerList.size());
for (Object inner : innerList) {
assertTrue(inner instanceof Record);
assertNotNull(((Record)inner).get("Message"));
}
}
}