mirror of https://github.com/apache/nifi.git
NIFI-7249: Force String keys in maps in DataTypeUtils.inferDataType()
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #4139.
This commit is contained in:
parent
dc95cff3b7
commit
798a8eeb50
|
@ -474,7 +474,23 @@ public class DataTypeUtils {
|
||||||
|
|
||||||
// A value of a Map could be either a Record or a Map type. In either case, it must have Strings as keys.
|
// A value of a Map could be either a Record or a Map type. In either case, it must have Strings as keys.
|
||||||
if (value instanceof Map) {
|
if (value instanceof Map) {
|
||||||
final Map<String, ?> map = (Map<String, ?>) value;
|
final Map<String, Object> map;
|
||||||
|
// Only transform the map if the keys aren't strings
|
||||||
|
boolean allStrings = true;
|
||||||
|
for (final Object key : ((Map<?, ?>) value).keySet()) {
|
||||||
|
if (!(key instanceof String)) {
|
||||||
|
allStrings = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (allStrings) {
|
||||||
|
map = (Map<String, Object>) value;
|
||||||
|
} else {
|
||||||
|
final Map<?, ?> m = (Map<?, ?>) value;
|
||||||
|
map = new HashMap<>(m.size());
|
||||||
|
m.forEach((k, v) -> map.put(k == null ? null : k.toString(), v));
|
||||||
|
}
|
||||||
return inferRecordDataType(map);
|
return inferRecordDataType(map);
|
||||||
// // Check if all types are the same.
|
// // Check if all types are the same.
|
||||||
// if (map.isEmpty()) {
|
// if (map.isEmpty()) {
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.nifi.serialization.record;
|
||||||
|
|
||||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||||
import org.apache.nifi.serialization.record.type.ChoiceDataType;
|
import org.apache.nifi.serialization.record.type.ChoiceDataType;
|
||||||
|
import org.apache.nifi.serialization.record.type.RecordDataType;
|
||||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||||
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
|
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -442,6 +443,36 @@ public class TestDataTypeUtils {
|
||||||
assertEquals(expected, actual);
|
assertEquals(expected, actual);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInferTypeWithMapStringKeys() {
|
||||||
|
Map<String, String> map = new HashMap<>();
|
||||||
|
map.put("a", "Hello");
|
||||||
|
map.put("b", "World");
|
||||||
|
|
||||||
|
RecordDataType expected = (RecordDataType)RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList(
|
||||||
|
new RecordField("a", RecordFieldType.STRING.getDataType()),
|
||||||
|
new RecordField("b", RecordFieldType.STRING.getDataType())
|
||||||
|
)));
|
||||||
|
|
||||||
|
DataType actual = DataTypeUtils.inferDataType(map, null);
|
||||||
|
assertEquals(expected, actual);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInferTypeWithMapNonStringKeys() {
|
||||||
|
Map<Integer, String> map = new HashMap<>();
|
||||||
|
map.put(1, "Hello");
|
||||||
|
map.put(2, "World");
|
||||||
|
|
||||||
|
RecordDataType expected = (RecordDataType)RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList(
|
||||||
|
new RecordField("1", RecordFieldType.STRING.getDataType()),
|
||||||
|
new RecordField("2", RecordFieldType.STRING.getDataType())
|
||||||
|
)));
|
||||||
|
|
||||||
|
DataType actual = DataTypeUtils.inferDataType(map, null);
|
||||||
|
assertEquals(expected, actual);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFindMostSuitableTypeWithBoolean() {
|
public void testFindMostSuitableTypeWithBoolean() {
|
||||||
testFindMostSuitableType(true, RecordFieldType.BOOLEAN.getDataType());
|
testFindMostSuitableType(true, RecordFieldType.BOOLEAN.getDataType());
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.avro.generic.GenericDatumReader;
|
||||||
import org.apache.avro.generic.GenericFixed;
|
import org.apache.avro.generic.GenericFixed;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.generic.GenericRecordBuilder;
|
import org.apache.avro.generic.GenericRecordBuilder;
|
||||||
|
import org.apache.avro.util.Utf8;
|
||||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||||
import org.apache.nifi.serialization.record.DataType;
|
import org.apache.nifi.serialization.record.DataType;
|
||||||
|
@ -640,4 +641,54 @@ public class TestAvroTypeUtil {
|
||||||
// THEN
|
// THEN
|
||||||
assertEquals(expected, actualAfterReverse);
|
assertEquals(expected, actualAfterReverse);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConvertAvroMap() {
|
||||||
|
// GIVEN
|
||||||
|
Map<?, ?> expected = new HashMap<String, Object>() {{
|
||||||
|
put(
|
||||||
|
"nullableMapField",
|
||||||
|
new HashMap<String, Object>() {{
|
||||||
|
put("key1", "value1");
|
||||||
|
put("key2", "value2");
|
||||||
|
}}
|
||||||
|
);
|
||||||
|
}};
|
||||||
|
|
||||||
|
Schema nullableMapFieldAvroSchema = Schema.createUnion(
|
||||||
|
Schema.create(Type.NULL),
|
||||||
|
Schema.create(Type.INT),
|
||||||
|
Schema.createMap(Schema.create(Type.STRING))
|
||||||
|
);
|
||||||
|
|
||||||
|
Schema avroRecordSchema = Schema.createRecord(
|
||||||
|
"record", "doc", "namespace", false,
|
||||||
|
Arrays.asList(
|
||||||
|
new Field("nullableMapField", nullableMapFieldAvroSchema, "nullable map field", (Object)null)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
Map<?, ?> value = new HashMap<Utf8, Object>(){{
|
||||||
|
put(new Utf8("key1"), "value1");
|
||||||
|
put(new Utf8("key2"), "value2");
|
||||||
|
}};
|
||||||
|
|
||||||
|
Record avroRecord = new GenericRecordBuilder(avroRecordSchema)
|
||||||
|
.set("nullableMapField", value)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
RecordSchema nifiRecordSchema = new SimpleRecordSchema(
|
||||||
|
Arrays.asList(
|
||||||
|
new RecordField("nullableMapField", RecordFieldType.CHOICE.getChoiceDataType(
|
||||||
|
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType())
|
||||||
|
))
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
Object actual = AvroTypeUtil.convertAvroRecordToMap(avroRecord, nifiRecordSchema);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
assertEquals(expected, actual);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue