mirror of https://github.com/apache/nifi.git
NIFI-9981 Added support for Avro UUID types
This closes #6013 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
5980625014
commit
a3e8048b2d
|
@ -95,6 +95,11 @@ public enum RecordFieldType {
|
|||
*/
|
||||
TIME("time", "HH:mm:ss"),
|
||||
|
||||
/**
|
||||
* A UUID data type. Fields of this type us a {@code java.util.UUID} value
|
||||
*/
|
||||
UUID("uuid"),
|
||||
|
||||
/**
|
||||
* A char field type. Fields of this type use a {@code char} value.
|
||||
*/
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.io.Reader;
|
|||
import java.lang.reflect.Array;
|
||||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.Blob;
|
||||
|
@ -72,6 +73,7 @@ import java.util.Optional;
|
|||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.TimeZone;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
|
@ -210,6 +212,8 @@ public class DataTypeUtils {
|
|||
return toTime(value, timeFormat, fieldName);
|
||||
case TIMESTAMP:
|
||||
return toTimestamp(value, timestampFormat, fieldName);
|
||||
case UUID:
|
||||
return toUUID(value);
|
||||
case ARRAY:
|
||||
return toArray(value, fieldName, ((ArrayDataType)dataType).getElementType(), charset);
|
||||
case MAP:
|
||||
|
@ -233,6 +237,40 @@ public class DataTypeUtils {
|
|||
return null;
|
||||
}
|
||||
|
||||
private static Object toUUID(Object value) {
|
||||
if (value == null) {
|
||||
throw new IllegalTypeConversionException("Null values cannot be converted to a UUID");
|
||||
}
|
||||
|
||||
if (value instanceof String) {
|
||||
try {
|
||||
return UUID.fromString((String)value);
|
||||
} catch (Exception ex) {
|
||||
throw new IllegalTypeConversionException(String.format("Could not parse %s into a UUID", value), ex);
|
||||
}
|
||||
} else if (value instanceof byte[]) {
|
||||
return uuidFromBytes((byte[])value);
|
||||
} else if (value instanceof Byte[]) {
|
||||
Byte[] array = (Byte[])value;
|
||||
byte[] converted = new byte[array.length];
|
||||
for (int x = 0; x < array.length; x++) {
|
||||
converted[x] = array[x];
|
||||
}
|
||||
return uuidFromBytes(converted);
|
||||
} else {
|
||||
throw new IllegalTypeConversionException(value.getClass() + " cannot be converted into a UUID");
|
||||
}
|
||||
}
|
||||
|
||||
private static UUID uuidFromBytes(byte[] bytes) {
|
||||
try {
|
||||
ByteBuffer buffer = ByteBuffer.wrap(bytes);
|
||||
return new UUID(buffer.getLong(), buffer.getLong());
|
||||
} catch (Exception ex) {
|
||||
throw new IllegalTypeConversionException("Could not convert bytes to UUID");
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isCompatibleDataType(final Object value, final DataType dataType) {
|
||||
return isCompatibleDataType(value, dataType, false);
|
||||
}
|
||||
|
@ -712,6 +750,20 @@ public class DataTypeUtils {
|
|||
return dest;
|
||||
}
|
||||
|
||||
if (value instanceof UUID) {
|
||||
UUID uuid = (UUID)value;
|
||||
ByteBuffer buffer = ByteBuffer.allocate(16);
|
||||
buffer.putLong(uuid.getMostSignificantBits());
|
||||
buffer.putLong(uuid.getLeastSignificantBits());
|
||||
Byte[] result = new Byte[16];
|
||||
byte[] array = buffer.array();
|
||||
for (int index = 0; index < array.length; index++) {
|
||||
result[index] = array[index];
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
if (value instanceof List) {
|
||||
final List<?> list = (List<?>)value;
|
||||
return list.toArray();
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test;
|
|||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.Date;
|
||||
import java.sql.Timestamp;
|
||||
|
@ -49,6 +50,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.TimeZone;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.DoubleAdder;
|
||||
import java.util.function.Function;
|
||||
|
@ -133,6 +135,57 @@ public class TestDataTypeUtils {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUUIDStringToUUIDObject() {
|
||||
UUID generated = UUID.randomUUID();
|
||||
String uuidString = generated.toString();
|
||||
|
||||
Object result = DataTypeUtils.convertType(uuidString, RecordFieldType.UUID.getDataType(), "uuid_test");
|
||||
assertTrue(result instanceof UUID);
|
||||
assertEquals(generated, result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUUIDObjectToUUIDString() {
|
||||
UUID generated = UUID.randomUUID();
|
||||
String uuid = generated.toString();
|
||||
|
||||
Object result = DataTypeUtils.convertType(generated, RecordFieldType.STRING.getDataType(), "uuid_test");
|
||||
assertTrue(result instanceof String);
|
||||
assertEquals(uuid, result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUUIDToByteArray() {
|
||||
UUID generated = UUID.randomUUID();
|
||||
ByteBuffer buffer = ByteBuffer.allocate(16);
|
||||
buffer.putLong(generated.getMostSignificantBits());
|
||||
buffer.putLong(generated.getLeastSignificantBits());
|
||||
byte[] expected = buffer.array();
|
||||
|
||||
Object result = DataTypeUtils.convertType(expected, RecordFieldType.UUID.getDataType(), "uuid_test");
|
||||
assertTrue(result instanceof UUID);
|
||||
assertEquals(generated, result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByteArrayToUUID() {
|
||||
UUID generated = UUID.randomUUID();
|
||||
ByteBuffer buffer = ByteBuffer.allocate(16);
|
||||
buffer.putLong(generated.getMostSignificantBits());
|
||||
buffer.putLong(generated.getLeastSignificantBits());
|
||||
byte[] expected = buffer.array();
|
||||
|
||||
Object result = DataTypeUtils.convertType(expected, RecordFieldType.ARRAY.getDataType(), "uuid_test");
|
||||
assertTrue(result instanceof Byte[]);
|
||||
assertEquals( 16, ((Byte[]) result).length);
|
||||
Byte[] bytes = (Byte[])result;
|
||||
for (int x = 0; x < bytes.length; x++) {
|
||||
byte current = bytes[x];
|
||||
assertEquals(expected[x], current);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertRecordArrayToJavaArray() {
|
||||
assertNull(DataTypeUtils.convertRecordArrayToJavaArray(null, null));
|
||||
|
|
|
@ -89,6 +89,7 @@ public class AvroTypeUtil {
|
|||
private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis";
|
||||
private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
|
||||
private static final String LOGICAL_TYPE_DECIMAL = "decimal";
|
||||
private static final String LOGICAL_TYPE_UUID = "uuid";
|
||||
|
||||
|
||||
public static Schema extractAvroSchema(final RecordSchema recordSchema) {
|
||||
|
@ -299,6 +300,10 @@ public class AvroTypeUtil {
|
|||
schema = Schema.create(Type.LONG);
|
||||
LogicalTypes.timestampMillis().addToSchema(schema);
|
||||
break;
|
||||
case UUID:
|
||||
schema = Schema.create(Type.STRING);
|
||||
LogicalTypes.uuid().addToSchema(schema);
|
||||
break;
|
||||
case ENUM:
|
||||
final EnumDataType enumType = (EnumDataType) dataType;
|
||||
schema = Schema.createEnum(fieldName, "", "org.apache.nifi", enumType.getEnums());
|
||||
|
@ -362,6 +367,8 @@ public class AvroTypeUtil {
|
|||
case LOGICAL_TYPE_DECIMAL:
|
||||
final LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
|
||||
return RecordFieldType.DECIMAL.getDecimalDataType(decimal.getPrecision(), decimal.getScale());
|
||||
case LOGICAL_TYPE_UUID:
|
||||
return RecordFieldType.UUID.getDataType();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -186,6 +186,41 @@ public class TestAvroTypeUtil {
|
|||
assertEquals(0L, avroRecord.get("number"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUUIDSupport() {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField("uuidv4", RecordFieldType.UUID.getDataType(), false));
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
final Schema result = AvroTypeUtil.extractAvroSchema(schema);
|
||||
assertNotNull(result);
|
||||
assertEquals(1, result.getFields().size());
|
||||
assertNotNull(result.getFields().get(0));
|
||||
assertEquals(LogicalTypes.uuid(), result.getFields().get(0).schema().getLogicalType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAvroUUIDToRecordApiUUID() {
|
||||
String avroSchema = "{\n" +
|
||||
"\t\"type\": \"record\",\n" +
|
||||
"\t\"name\": \"test\",\n" +
|
||||
"\t\"fields\": [\n" +
|
||||
"\t\t{\n" +
|
||||
"\t\t\t\"name\": \"uuid_test\",\n" +
|
||||
"\t\t\t\"type\": {\n" +
|
||||
"\t\t\t\t\"type\": \"string\",\n" +
|
||||
"\t\t\t\t\"logicalType\": \"uuid\"\n" +
|
||||
"\t\t\t}\n" +
|
||||
"\t\t}\n" +
|
||||
"\t]\n" +
|
||||
"}";
|
||||
Schema schema = new Schema.Parser().parse(avroSchema);
|
||||
RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
|
||||
Optional<RecordField> uuidField = recordSchema.getField("uuid_test");
|
||||
assertTrue(uuidField.isPresent());
|
||||
RecordField field = uuidField.get();
|
||||
assertTrue(field.getDataType() == RecordFieldType.UUID.getDataType());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateAvroSchemaPrimitiveTypes() {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
|
|
|
@ -184,6 +184,7 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
|
|||
switch (dataType.getFieldType()) {
|
||||
case DATE:
|
||||
case TIME:
|
||||
case UUID:
|
||||
case TIMESTAMP:
|
||||
try {
|
||||
return DataTypeUtils.convertType(value, dataType, getLazyDateFormat(), getLazyTimeFormat(), getLazyTimestampFormat(), fieldName);
|
||||
|
|
|
@ -198,6 +198,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
|
|||
case ENUM:
|
||||
case DATE:
|
||||
case TIME:
|
||||
case UUID:
|
||||
case TIMESTAMP: {
|
||||
final Object rawValue = getRawNodeValue(fieldNode, fieldName);
|
||||
return DataTypeUtils.convertType(rawValue, desiredType, getLazyDateFormat(), getLazyTimeFormat(), getLazyTimestampFormat(), fieldName);
|
||||
|
|
|
@ -374,6 +374,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
|
|||
case SHORT:
|
||||
generator.writeNumber(DataTypeUtils.toInteger(coercedValue, fieldName));
|
||||
break;
|
||||
case UUID:
|
||||
case CHAR:
|
||||
case STRING:
|
||||
generator.writeString(coercedValue.toString());
|
||||
|
|
|
@ -263,6 +263,7 @@ public class WriteXMLResult extends AbstractRecordSetWriter implements RecordSet
|
|||
case INT:
|
||||
case LONG:
|
||||
case SHORT:
|
||||
case UUID:
|
||||
case STRING: {
|
||||
writeAllTags(tagsToOpen, fieldName);
|
||||
writer.writeCharacters(coercedValue.toString());
|
||||
|
|
|
@ -154,6 +154,7 @@ public class XMLRecordReader implements RecordReader {
|
|||
case STRING:
|
||||
case DATE:
|
||||
case TIME:
|
||||
case UUID:
|
||||
case TIMESTAMP: {
|
||||
|
||||
StringBuilder content = new StringBuilder();
|
||||
|
|
|
@ -134,6 +134,7 @@ public class TestWriteCSVResult {
|
|||
valueMap.put("choice", 48L);
|
||||
valueMap.put("array", null);
|
||||
valueMap.put("enum", null);
|
||||
valueMap.put("uuid", "8bb20bf2-ec41-4b94-80a4-922f4dba009c");
|
||||
|
||||
final Record record = new MapRecord(schema, valueMap);
|
||||
final RecordSet rs = RecordSet.of(schema, record);
|
||||
|
@ -156,7 +157,8 @@ public class TestWriteCSVResult {
|
|||
|
||||
final String values = splits[1];
|
||||
final StringBuilder expectedBuilder = new StringBuilder();
|
||||
expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"8.1\",\"" + timestampValue + "\",\"" + dateValue + "\",\"" + timeValue + "\",\"c\",,\"a孟bc李12儒3\",,\"48\",,");
|
||||
expectedBuilder.append("\"true\",\"1\",\"8\",\"9\",\"8\",\"8\",\"8.0\",\"8.0\",\"8.1\",\"" + timestampValue +
|
||||
"\",\"" + dateValue + "\",\"" + timeValue + "\",\"8bb20bf2-ec41-4b94-80a4-922f4dba009c\",\"c\",,\"a孟bc李12儒3\",,\"48\",,");
|
||||
|
||||
final String expectedValues = expectedBuilder.toString();
|
||||
|
||||
|
|
|
@ -102,6 +102,7 @@ class TestWriteJsonResult {
|
|||
valueMap.put("enum", null);
|
||||
valueMap.put("choice", 48L);
|
||||
valueMap.put("map", map);
|
||||
valueMap.put("uuid", "8bb20bf2-ec41-4b94-80a4-922f4dba009c");
|
||||
|
||||
final Record record = new MapRecord(schema, valueMap);
|
||||
final RecordSet rs = RecordSet.of(schema, record);
|
||||
|
|
|
@ -197,6 +197,7 @@ public class TestWriteXMLResult {
|
|||
valueMap.put("enum", null);
|
||||
valueMap.put("choice", 48L);
|
||||
valueMap.put("map", map);
|
||||
valueMap.put("uuid", "8bb20bf2-ec41-4b94-80a4-922f4dba009c");
|
||||
|
||||
final Record record = new MapRecord(schema, valueMap);
|
||||
final RecordSet rs = RecordSet.of(schema, record);
|
||||
|
@ -207,7 +208,7 @@ public class TestWriteXMLResult {
|
|||
writer.write(rs);
|
||||
writer.flush();
|
||||
|
||||
String xmlResult = "<ROOT><RECORD><string>string</string><boolean>true</boolean><byte>1</byte><char>c</char><enum /><short>8</short>" +
|
||||
String xmlResult = "<ROOT><RECORD><string>string</string><boolean>true</boolean><byte>1</byte><uuid>8bb20bf2-ec41-4b94-80a4-922f4dba009c</uuid><char>c</char><enum /><short>8</short>" +
|
||||
"<int>9</int><bigint>8</bigint><long>8</long><float>8.0</float><double>8.0</double><decimal>8.1</decimal>" +
|
||||
"<date>2017-01-01</date><time>17:00:00</time><timestamp>2017-01-01 17:00:00</timestamp><record /><choice>48</choice><array />" +
|
||||
"<map><height>48</height><width>96</width></map></RECORD></ROOT>";
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
"timestamp" : "2017-01-01 17:00:00",
|
||||
"date" : "2017-01-01",
|
||||
"time" : "17:00:00",
|
||||
"uuid": "8bb20bf2-ec41-4b94-80a4-922f4dba009c",
|
||||
"char" : "c",
|
||||
"enum" : null,
|
||||
"string" : "string",
|
||||
|
|
Loading…
Reference in New Issue