NIFI-1663: Add ConvertAvroToORC processor

This closes #727
This commit is contained in:
Matt Burgess 2016-07-26 23:25:11 -04:00 committed by Oleg Zhurakousky
parent bb24312709
commit d9720239f5
8 changed files with 3996 additions and 14 deletions

View File

@ -44,10 +44,6 @@
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-orc</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
@ -191,11 +187,6 @@
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -0,0 +1,466 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.io.orc;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT;
/**
* Utility methods for ORC support (conversion from Avro, conversion to Hive types, e.g.
*/
public class NiFiOrcUtils {
public static Object convertToORCObject(TypeInfo typeInfo, Object o) {
if (o != null) {
if (typeInfo instanceof UnionTypeInfo) {
OrcUnion union = new OrcUnion();
// Need to find which of the union types correspond to the primitive object
TypeInfo objectTypeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(
ObjectInspectorFactory.getReflectionObjectInspector(o.getClass(), ObjectInspectorFactory.ObjectInspectorOptions.JAVA));
List<TypeInfo> unionTypeInfos = ((UnionTypeInfo) typeInfo).getAllUnionObjectTypeInfos();
int index = 0;
while (index < unionTypeInfos.size() && !unionTypeInfos.get(index).equals(objectTypeInfo)) {
index++;
}
if (index < unionTypeInfos.size()) {
union.set((byte) index, convertToORCObject(objectTypeInfo, o));
} else {
throw new IllegalArgumentException("Object Type for class " + o.getClass().getName() + " not in Union declaration");
}
return union;
}
if (o instanceof Integer) {
return new IntWritable((int) o);
}
if (o instanceof Boolean) {
return new BooleanWritable((boolean) o);
}
if (o instanceof Long) {
return new LongWritable((long) o);
}
if (o instanceof Float) {
return new FloatWritable((float) o);
}
if (o instanceof Double) {
return new DoubleWritable((double) o);
}
if (o instanceof String || o instanceof Utf8 || o instanceof GenericData.EnumSymbol) {
return new Text(o.toString());
}
if (o instanceof ByteBuffer) {
return new BytesWritable(((ByteBuffer) o).array());
}
if (o instanceof int[]) {
int[] intArray = (int[]) o;
return Arrays.stream(intArray)
.mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("int"), element))
.collect(Collectors.toList());
}
if (o instanceof long[]) {
long[] longArray = (long[]) o;
return Arrays.stream(longArray)
.mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("bigint"), element))
.collect(Collectors.toList());
}
if (o instanceof float[]) {
float[] floatArray = (float[]) o;
return IntStream.range(0, floatArray.length)
.mapToDouble(i -> floatArray[i])
.mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("float"), (float) element))
.collect(Collectors.toList());
}
if (o instanceof double[]) {
double[] doubleArray = (double[]) o;
return Arrays.stream(doubleArray)
.mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("double"), element))
.collect(Collectors.toList());
}
if (o instanceof boolean[]) {
boolean[] booleanArray = (boolean[]) o;
return IntStream.range(0, booleanArray.length)
.map(i -> booleanArray[i] ? 1 : 0)
.mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("boolean"), element == 1))
.collect(Collectors.toList());
}
if (o instanceof GenericData.Array) {
GenericData.Array array = ((GenericData.Array) o);
// The type information in this case is interpreted as a List
TypeInfo listTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo();
return array.stream().map((element) -> convertToORCObject(listTypeInfo, element)).collect(Collectors.toList());
}
if (o instanceof List) {
return o;
}
if (o instanceof Map) {
MapWritable mapWritable = new MapWritable();
TypeInfo keyInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo();
TypeInfo valueInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo();
// Unions are not allowed as key/value types, so if we convert the key and value objects,
// they should return Writable objects
((Map) o).forEach((key, value) -> {
Object keyObject = convertToORCObject(keyInfo, key);
Object valueObject = convertToORCObject(valueInfo, value);
if (keyObject == null
|| !(keyObject instanceof Writable)
|| !(valueObject instanceof Writable)
) {
throw new IllegalArgumentException("Maps may only contain Writable types, and the key cannot be null");
}
mapWritable.put((Writable) keyObject, (Writable) valueObject);
});
return mapWritable;
}
}
return null;
}
/**
* Create an object of OrcStruct given a TypeInfo and a list of objects
*
* @param typeInfo The TypeInfo object representing the ORC record schema
* @param objs ORC objects/Writables
* @return an OrcStruct containing the specified objects for the specified schema
*/
public static OrcStruct createOrcStruct(TypeInfo typeInfo, Object... objs) {
SettableStructObjectInspector oi = (SettableStructObjectInspector) OrcStruct
.createObjectInspector(typeInfo);
List<StructField> fields = (List<StructField>) oi.getAllStructFieldRefs();
OrcStruct result = (OrcStruct) oi.create();
result.setNumFields(fields.size());
for (int i = 0; i < fields.size(); i++) {
oi.setStructFieldData(result, fields.get(i), objs[i]);
}
return result;
}
public static String normalizeHiveTableName(String name) {
return name.replaceAll("[\\. ]", "_");
}
public static String generateHiveDDL(Schema avroSchema, String tableName) {
Schema.Type schemaType = avroSchema.getType();
StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS ");
sb.append(tableName);
sb.append(" (");
if (Schema.Type.RECORD.equals(schemaType)) {
List<String> hiveColumns = new ArrayList<>();
List<Schema.Field> fields = avroSchema.getFields();
if (fields != null) {
hiveColumns.addAll(
fields.stream().map(field -> field.name() + " " + getHiveTypeFromAvroType(field.schema())).collect(Collectors.toList()));
}
sb.append(StringUtils.join(hiveColumns, ", "));
sb.append(") STORED AS ORC");
return sb.toString();
} else {
throw new IllegalArgumentException("Avro schema is of type " + schemaType.getName() + ", not RECORD");
}
}
public static TypeInfo getOrcField(Schema fieldSchema) throws IllegalArgumentException {
Schema.Type fieldType = fieldSchema.getType();
switch (fieldType) {
case INT:
case LONG:
case BOOLEAN:
case BYTES:
case DOUBLE:
case FLOAT:
case STRING:
return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType);
case UNION:
List<Schema> unionFieldSchemas = fieldSchema.getTypes();
if (unionFieldSchemas != null) {
// Ignore null types in union
List<TypeInfo> orcFields = unionFieldSchemas.stream().filter(
unionFieldSchema -> !Schema.Type.NULL.equals(unionFieldSchema.getType()))
.map(NiFiOrcUtils::getOrcField)
.collect(Collectors.toList());
// Flatten the field if the union only has one non-null element
if (orcFields.size() == 1) {
return orcFields.get(0);
} else {
return TypeInfoFactory.getUnionTypeInfo(orcFields);
}
}
return null;
case ARRAY:
return TypeInfoFactory.getListTypeInfo(getOrcField(fieldSchema.getElementType()));
case MAP:
return TypeInfoFactory.getMapTypeInfo(
getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type.STRING),
getOrcField(fieldSchema.getValueType()));
case RECORD:
List<Schema.Field> avroFields = fieldSchema.getFields();
if (avroFields != null) {
List<String> orcFieldNames = new ArrayList<>(avroFields.size());
List<TypeInfo> orcFields = new ArrayList<>(avroFields.size());
avroFields.forEach(avroField -> {
String fieldName = avroField.name();
orcFieldNames.add(fieldName);
orcFields.add(getOrcField(avroField.schema()));
});
return TypeInfoFactory.getStructTypeInfo(orcFieldNames, orcFields);
}
return null;
case ENUM:
// An enum value is just a String for ORC/Hive
return getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type.STRING);
default:
throw new IllegalArgumentException("Did not recognize Avro type " + fieldType.getName());
}
}
public static Schema.Type getAvroSchemaTypeOfObject(Object o) {
if (o == null) {
return Schema.Type.NULL;
} else if (o instanceof Integer) {
return Schema.Type.INT;
} else if (o instanceof Long) {
return Schema.Type.LONG;
} else if (o instanceof Boolean) {
return Schema.Type.BOOLEAN;
} else if (o instanceof byte[]) {
return Schema.Type.BYTES;
} else if (o instanceof Float) {
return Schema.Type.FLOAT;
} else if (o instanceof Double) {
return Schema.Type.DOUBLE;
} else if (o instanceof Enum) {
return Schema.Type.ENUM;
} else if (o instanceof Object[]) {
return Schema.Type.ARRAY;
} else if (o instanceof List) {
return Schema.Type.ARRAY;
} else if (o instanceof Map) {
return Schema.Type.MAP;
} else {
throw new IllegalArgumentException("Object of class " + o.getClass() + " is not a supported Avro Type");
}
}
public static TypeInfo getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type avroType) throws IllegalArgumentException {
if (avroType == null) {
throw new IllegalArgumentException("Avro type is null");
}
switch (avroType) {
case INT:
return TypeInfoFactory.getPrimitiveTypeInfo("int");
case LONG:
return TypeInfoFactory.getPrimitiveTypeInfo("bigint");
case BOOLEAN:
return TypeInfoFactory.getPrimitiveTypeInfo("boolean");
case BYTES:
return TypeInfoFactory.getPrimitiveTypeInfo("binary");
case DOUBLE:
return TypeInfoFactory.getPrimitiveTypeInfo("double");
case FLOAT:
return TypeInfoFactory.getPrimitiveTypeInfo("float");
case STRING:
return TypeInfoFactory.getPrimitiveTypeInfo("string");
default:
throw new IllegalArgumentException("Avro type " + avroType.getName() + " is not a primitive type");
}
}
public static String getHiveTypeFromAvroType(Schema avroSchema) {
if (avroSchema == null) {
throw new IllegalArgumentException("Avro schema is null");
}
Schema.Type avroType = avroSchema.getType();
switch (avroType) {
case INT:
return "INT";
case LONG:
return "BIGINT";
case BOOLEAN:
return "BOOLEAN";
case BYTES:
return "BINARY";
case DOUBLE:
return "DOUBLE";
case FLOAT:
return "FLOAT";
case STRING:
case ENUM:
return "STRING";
case UNION:
List<Schema> unionFieldSchemas = avroSchema.getTypes();
if (unionFieldSchemas != null) {
List<String> hiveFields = new ArrayList<>();
for (Schema unionFieldSchema : unionFieldSchemas) {
Schema.Type unionFieldSchemaType = unionFieldSchema.getType();
// Ignore null types in union
if (!Schema.Type.NULL.equals(unionFieldSchemaType)) {
hiveFields.add(getHiveTypeFromAvroType(unionFieldSchema));
}
}
// Flatten the field if the union only has one non-null element
return (hiveFields.size() == 1)
? hiveFields.get(0)
: "UNIONTYPE<" + StringUtils.join(hiveFields, ", ") + ">";
}
break;
case MAP:
return "MAP<STRING, " + getHiveTypeFromAvroType(avroSchema.getValueType()) + ">";
case ARRAY:
return "ARRAY<" + getHiveTypeFromAvroType(avroSchema.getElementType()) + ">";
case RECORD:
List<Schema.Field> recordFields = avroSchema.getFields();
if (recordFields != null) {
List<String> hiveFields = recordFields.stream().map(
recordField -> recordField.name() + ":" + getHiveTypeFromAvroType(recordField.schema())).collect(Collectors.toList());
return "STRUCT<" + StringUtils.join(hiveFields, ", ") + ">";
}
break;
default:
break;
}
throw new IllegalArgumentException("Error converting Avro type " + avroType.getName() + " to Hive type");
}
public static OrcFlowFileWriter createWriter(OutputStream flowFileOutputStream,
Path path,
Configuration conf,
TypeInfo orcSchema,
long stripeSize,
CompressionKind compress,
int bufferSize) throws IOException {
int rowIndexStride = HiveConf.getIntVar(conf, HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE);
boolean addBlockPadding = HiveConf.getBoolVar(conf, HIVE_ORC_DEFAULT_BLOCK_PADDING);
String versionName = HiveConf.getVar(conf, HIVE_ORC_WRITE_FORMAT);
OrcFile.Version versionValue = (versionName == null)
? OrcFile.Version.CURRENT
: OrcFile.Version.byName(versionName);
OrcFile.EncodingStrategy encodingStrategy;
String enString = conf.get(HiveConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname);
if (enString == null) {
encodingStrategy = OrcFile.EncodingStrategy.SPEED;
} else {
encodingStrategy = OrcFile.EncodingStrategy.valueOf(enString);
}
OrcFile.CompressionStrategy compressionStrategy;
String compString = conf.get(HiveConf.ConfVars.HIVE_ORC_COMPRESSION_STRATEGY.varname);
if (compString == null) {
compressionStrategy = OrcFile.CompressionStrategy.SPEED;
} else {
compressionStrategy = OrcFile.CompressionStrategy.valueOf(compString);
}
float paddingTolerance;
paddingTolerance = conf.getFloat(HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname,
HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal);
long blockSizeValue = HiveConf.getLongVar(conf, HIVE_ORC_DEFAULT_BLOCK_SIZE);
double bloomFilterFpp = BloomFilterIO.DEFAULT_FPP;
ObjectInspector inspector = OrcStruct.createObjectInspector(orcSchema);
return new OrcFlowFileWriter(flowFileOutputStream,
path,
conf,
inspector,
stripeSize,
compress,
bufferSize,
rowIndexStride,
getMemoryManager(conf),
addBlockPadding,
versionValue,
null, // no callback
encodingStrategy,
compressionStrategy,
paddingTolerance,
blockSizeValue,
null, // no Bloom Filter column names
bloomFilterFpp);
}
private static MemoryManager memoryManager = null;
private static synchronized MemoryManager getMemoryManager(Configuration conf) {
if (memoryManager == null) {
memoryManager = new MemoryManager(conf);
}
return memoryManager;
}
}

View File

@ -0,0 +1,289 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hive;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
import org.apache.hadoop.hive.ql.io.orc.OrcFlowFileWriter;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.hive.HiveJdbcCommon;
import org.apache.nifi.util.hive.HiveUtils;
import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* The ConvertAvroToORC processor takes an Avro-formatted flow file as input and converts it into ORC format.
*/
@SideEffectFree
@SupportsBatching
@Tags({"avro", "orc", "hive", "convert"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Converts an Avro record into ORC file format. This processor provides a direct mapping of an Avro record to an ORC record, such "
+ "that the resulting ORC file will have the same hierarchical structure as the Avro document. If an incoming FlowFile contains a stream of "
+ "multiple Avro records, the resultant FlowFile will contain a ORC file containing all of the Avro records. If an incoming FlowFile does "
+ "not contain any records, an empty ORC file is the output.")
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/octet-stream"),
@WritesAttribute(attribute = "filename", description = "Sets the filename to the existing filename with the extension replaced by / added to by .orc"),
@WritesAttribute(attribute = "record.count", description = "Sets the number of records in the ORC file."),
@WritesAttribute(attribute = "hive.ddl", description = "Creates a partial Hive DDL statement for creating a table in Hive from this ORC file. "
+ "This can be used in ReplaceText for setting the content to the DDL. To make it valid DDL, add \"LOCATION '<path_to_orc_file_in_hdfs>'\", where "
+ "the path is the directory that contains this ORC file on HDFS. For example, ConvertAvroToORC can send flow files to a PutHDFS processor to send the file to "
+ "HDFS, then to a ReplaceText to set the content to this DDL (plus the LOCATION clause as described), then to PutHiveQL processor to create the table "
+ "if it doesn't exist.")
})
public class ConvertAvroToORC extends AbstractProcessor {
// Attributes
public static final String ORC_MIME_TYPE = "application/octet-stream";
public static final String HIVE_DDL_ATTRIBUTE = "hive.ddl";
public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
// Properties
public static final PropertyDescriptor ORC_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
.name("orc-config-resources")
.displayName("ORC Configuration Resources")
.description("A file or comma separated list of files which contains the ORC configuration (hive-site.xml, e.g.). Without this, Hadoop "
+ "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Please see the ORC documentation for more details.")
.required(false).addValidator(HiveUtils.createMultipleFilesExistValidator()).build();
public static final PropertyDescriptor STRIPE_SIZE = new PropertyDescriptor.Builder()
.name("orc-stripe-size")
.displayName("Stripe Size")
.description("The size of the memory buffer (in bytes) for writing stripes to an ORC file")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("100 KB")
.build();
public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("orc-buffer-size")
.displayName("Buffer Size")
.description("The maximum size of the memory buffers (in bytes) used for compressing and storing a stripe in memory. This is a hint to the ORC writer, "
+ "which may choose to use a smaller buffer size based on stripe size and number of columns for efficient stripe writing and memory utilization.")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("10 KB")
.build();
public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
.name("orc-compression-type")
.displayName("Compression Type")
.required(true)
.allowableValues("NONE", "ZLIB", "SNAPPY", "LZO")
.defaultValue("NONE")
.build();
public static final PropertyDescriptor HIVE_TABLE_NAME = new PropertyDescriptor.Builder()
.name("orc-hive-table-name")
.displayName("Hive Table Name")
.description("An optional table name to insert into the hive.ddl attribute. The generated DDL can be used by "
+ "a PutHiveQL processor (presumably after a PutHDFS processor) to create a table backed by the converted ORC file. "
+ "If this property is not provided, the full name (including namespace) of the incoming Avro record will be normalized "
+ "and used as the table name.")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
// Relationships
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after it has been converted to ORC format.")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to ORC for any reason")
.build();
private final static List<PropertyDescriptor> propertyDescriptors;
private final static Set<Relationship> relationships;
private volatile Configuration orcConfig;
/*
* Will ensure that the list of property descriptors is built only once.
* Will also create a Set of relationships
*/
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(ORC_CONFIGURATION_RESOURCES);
_propertyDescriptors.add(STRIPE_SIZE);
_propertyDescriptors.add(BUFFER_SIZE);
_propertyDescriptors.add(COMPRESSION_TYPE);
_propertyDescriptors.add(HIVE_TABLE_NAME);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_relationships);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@OnScheduled
public void setup(ProcessContext context) {
boolean confFileProvided = context.getProperty(ORC_CONFIGURATION_RESOURCES).isSet();
if (confFileProvided) {
final String configFiles = context.getProperty(ORC_CONFIGURATION_RESOURCES).getValue();
orcConfig = HiveJdbcCommon.getConfigurationFromFiles(configFiles);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
try {
long startTime = System.currentTimeMillis();
final long stripeSize = context.getProperty(STRIPE_SIZE).asDataSize(DataUnit.B).longValue();
final int bufferSize = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final CompressionKind compressionType = CompressionKind.valueOf(context.getProperty(COMPRESSION_TYPE).getValue());
final AtomicReference<Schema> hiveAvroSchema = new AtomicReference<>(null);
final AtomicInteger totalRecordCount = new AtomicInteger(0);
final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
flowFile = session.write(flowFile, (rawIn, rawOut) -> {
try (final InputStream in = new BufferedInputStream(rawIn);
final OutputStream out = new BufferedOutputStream(rawOut);
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<>())) {
// Create ORC schema from Avro schema
Schema avroSchema = reader.getSchema();
TypeInfo orcSchema = NiFiOrcUtils.getOrcField(avroSchema);
if (orcConfig == null) {
orcConfig = new Configuration();
}
OrcFlowFileWriter orcWriter = NiFiOrcUtils.createWriter(
out,
new Path(fileName),
orcConfig,
orcSchema,
stripeSize,
compressionType,
bufferSize);
try {
int recordCount = 0;
GenericRecord currRecord = null;
while (reader.hasNext()) {
currRecord = reader.next(currRecord);
List<Schema.Field> fields = currRecord.getSchema().getFields();
if (fields != null) {
Object[] row = new Object[fields.size()];
for (int i = 0; i < fields.size(); i++) {
Schema.Field field = fields.get(i);
Schema fieldSchema = field.schema();
Object o = currRecord.get(field.name());
try {
row[i] = NiFiOrcUtils.convertToORCObject(NiFiOrcUtils.getOrcField(fieldSchema), o);
} catch (ArrayIndexOutOfBoundsException aioobe) {
getLogger().error("Index out of bounds at record {} for column {}, type {}, and object {}",
new Object[]{recordCount, i, fieldSchema.getType().getName(), o.toString()},
aioobe);
throw new IOException(aioobe);
}
}
orcWriter.addRow(NiFiOrcUtils.createOrcStruct(orcSchema, row));
recordCount++;
}
}
hiveAvroSchema.set(avroSchema);
totalRecordCount.set(recordCount);
} finally {
// finished writing this record, close the writer (which will flush to the flow file)
orcWriter.close();
}
}
});
final String hiveTableName = context.getProperty(HIVE_TABLE_NAME).isSet()
? context.getProperty(HIVE_TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue()
: NiFiOrcUtils.normalizeHiveTableName(hiveAvroSchema.get().getFullName());
String hiveDDL = NiFiOrcUtils.generateHiveDDL(hiveAvroSchema.get(), hiveTableName);
// Add attributes and transfer to success
flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTRIBUTE, Integer.toString(totalRecordCount.get()));
flowFile = session.putAttribute(flowFile, HIVE_DDL_ATTRIBUTE, hiveDDL);
StringBuilder newFilename = new StringBuilder();
int extensionIndex = fileName.lastIndexOf(".");
if (extensionIndex != -1) {
newFilename.append(fileName.substring(0, extensionIndex));
} else {
newFilename.append(fileName);
}
newFilename.append(".orc");
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), ORC_MIME_TYPE);
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename.toString());
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(flowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime);
} catch (final ProcessException pe) {
getLogger().error("Failed to convert {} from Avro to ORC due to {}; transferring to failure", new Object[]{flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.hive.ConvertAvroToORC
org.apache.nifi.processors.hive.SelectHiveQL
org.apache.nifi.processors.hive.PutHiveQL
org.apache.nifi.processors.hive.PutHiveStreaming

View File

@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hive;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.orc.TestNiFiOrcUtils;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for ConvertAvroToORC processor
*/
public class TestConvertAvroToORC {
private ConvertAvroToORC processor;
private TestRunner runner;
@Before
public void setUp() throws Exception {
processor = new ConvertAvroToORC();
runner = TestRunners.newTestRunner(processor);
}
@Test
public void test_Setup() throws Exception {
}
@Test
public void test_onTrigger_primitive_record() throws Exception {
GenericData.Record record = TestNiFiOrcUtils.buildPrimitiveAvroRecord(10, 20L, true, 30.0f, 40, StandardCharsets.UTF_8.encode("Hello"), "World");
DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
ByteArrayOutputStream out = new ByteArrayOutputStream();
fileWriter.create(record.getSchema(), out);
fileWriter.append(record);
// Put another record in
record = TestNiFiOrcUtils.buildPrimitiveAvroRecord(1, 2L, false, 3.0f, 4L, StandardCharsets.UTF_8.encode("I am"), "another record");
fileWriter.append(record);
// And one more
record = TestNiFiOrcUtils.buildPrimitiveAvroRecord(100, 200L, true, 300.0f, 400L, StandardCharsets.UTF_8.encode("Me"), "too!");
fileWriter.append(record);
fileWriter.flush();
fileWriter.close();
out.close();
Map<String, String> attributes = new HashMap<String, String>() {{
put(CoreAttributes.FILENAME.key(), "test.avro");
}};
runner.enqueue(out.toByteArray(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
// Write the flow file out to disk, since the ORC Reader needs a path
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS test_record (int INT, long BIGINT, boolean BOOLEAN, float FLOAT, double DOUBLE, bytes BINARY, string STRING)"
+ " STORED AS ORC", resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE));
assertEquals("3", resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE));
assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
FileOutputStream fos = new FileOutputStream("target/test1.orc");
fos.write(resultContents);
fos.flush();
fos.close();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Reader reader = OrcFile.createReader(new Path("target/test1.orc"), OrcFile.readerOptions(conf).filesystem(fs));
RecordReader rows = reader.rows();
Object o = rows.next(null);
assertNotNull(o);
assertTrue(o instanceof OrcStruct);
TypeInfo resultSchema = TestNiFiOrcUtils.buildPrimitiveOrcSchema();
StructObjectInspector inspector = (StructObjectInspector) OrcStruct.createObjectInspector(resultSchema);
// Check some fields in the first row
Object intFieldObject = inspector.getStructFieldData(o, inspector.getStructFieldRef("int"));
assertTrue(intFieldObject instanceof IntWritable);
assertEquals(10, ((IntWritable) intFieldObject).get());
Object stringFieldObject = inspector.getStructFieldData(o, inspector.getStructFieldRef("string"));
assertTrue(stringFieldObject instanceof Text);
assertEquals("World", stringFieldObject.toString());
}
@Test
public void test_onTrigger_complex_record() throws Exception {
Map<String, Double> mapData1 = new TreeMap<String, Double>() {{
put("key1", 1.0);
put("key2", 2.0);
}};
GenericData.Record record = TestNiFiOrcUtils.buildComplexAvroRecord(10, mapData1, "DEF", 3.0f, Arrays.asList(10, 20));
DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
ByteArrayOutputStream out = new ByteArrayOutputStream();
fileWriter.create(record.getSchema(), out);
fileWriter.append(record);
// Put another record in
Map<String, Double> mapData2 = new TreeMap<String, Double>() {{
put("key1", 3.0);
put("key2", 4.0);
}};
record = TestNiFiOrcUtils.buildComplexAvroRecord(null, mapData2, "XYZ", 4L, Arrays.asList(100, 200));
fileWriter.append(record);
fileWriter.flush();
fileWriter.close();
out.close();
Map<String, String> attributes = new HashMap<String, String>() {{
put(CoreAttributes.FILENAME.key(), "test");
}};
runner.enqueue(out.toByteArray(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
// Write the flow file out to disk, since the ORC Reader needs a path
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS complex_record " +
"(myInt INT, myMap MAP<STRING, DOUBLE>, myEnum STRING, myLongOrFloat UNIONTYPE<BIGINT, FLOAT>, myIntList ARRAY<INT>)"
+ " STORED AS ORC", resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE));
assertEquals("2", resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE));
assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
FileOutputStream fos = new FileOutputStream("target/test1.orc");
fos.write(resultContents);
fos.flush();
fos.close();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Reader reader = OrcFile.createReader(new Path("target/test1.orc"), OrcFile.readerOptions(conf).filesystem(fs));
RecordReader rows = reader.rows();
Object o = rows.next(null);
assertNotNull(o);
assertTrue(o instanceof OrcStruct);
TypeInfo resultSchema = TestNiFiOrcUtils.buildComplexOrcSchema();
StructObjectInspector inspector = (StructObjectInspector) OrcStruct.createObjectInspector(resultSchema);
// Check some fields in the first row
Object intFieldObject = inspector.getStructFieldData(o, inspector.getStructFieldRef("myInt"));
assertTrue(intFieldObject instanceof IntWritable);
assertEquals(10, ((IntWritable) intFieldObject).get());
// This is pretty awkward and messy. The map object is a Map (not a MapWritable) but the keys are writables (in this case Text)
// and so are the values (DoubleWritables in this case).
Object mapFieldObject = inspector.getStructFieldData(o, inspector.getStructFieldRef("myMap"));
assertTrue(mapFieldObject instanceof Map);
Map map = (Map) mapFieldObject;
Object mapValue = map.get(new Text("key1"));
assertNotNull(mapValue);
assertTrue(mapValue instanceof DoubleWritable);
assertEquals(1.0, ((DoubleWritable) mapValue).get(), Double.MIN_VALUE);
mapValue = map.get(new Text("key2"));
assertNotNull(mapValue);
assertTrue(mapValue instanceof DoubleWritable);
assertEquals(2.0, ((DoubleWritable) mapValue).get(), Double.MIN_VALUE);
}
}

View File

@ -684,12 +684,12 @@ public class TestPutHiveStreaming {
@Override
protected void closeConnection() throws InterruptedException {
// TODO
// Empty
}
@Override
protected void commitTxn() throws CommitFailure, InterruptedException {
// TODO
// Empty
}
@Override
@ -700,17 +700,17 @@ public class TestPutHiveStreaming {
@Override
protected void closeTxnBatch() throws InterruptedException {
// TODO
// Empty
}
@Override
protected void abortTxn() throws InterruptedException {
// TODO
// Empty
}
@Override
protected void nextTxn(boolean rollToNext) throws StreamingException, InterruptedException, TxnBatchFailure {
// TODO
// Empty
}
}

View File

@ -0,0 +1,371 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util.orc;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for the NiFiOrcUtils helper class
*/
public class TestNiFiOrcUtils {
@Test
public void test_getOrcField_primitive() throws Exception {
// Expected ORC types
TypeInfo[] expectedTypes = {
TypeInfoFactory.getPrimitiveTypeInfo("int"),
TypeInfoFactory.getPrimitiveTypeInfo("bigint"),
TypeInfoFactory.getPrimitiveTypeInfo("boolean"),
TypeInfoFactory.getPrimitiveTypeInfo("float"),
TypeInfoFactory.getPrimitiveTypeInfo("double"),
TypeInfoFactory.getPrimitiveTypeInfo("binary"),
TypeInfoFactory.getPrimitiveTypeInfo("string")
};
// Build a fake Avro record with all types
Schema testSchema = buildPrimitiveAvroSchema();
List<Schema.Field> fields = testSchema.getFields();
for (int i = 0; i < fields.size(); i++) {
assertEquals(expectedTypes[i], NiFiOrcUtils.getOrcField(fields.get(i).schema()));
}
}
@Test
public void test_getOrcField_union_optional_type() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("union").type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
Schema testSchema = builder.endRecord();
TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("union").schema());
assertEquals(TypeInfoCreator.createBoolean(), orcType);
}
@Test
public void test_getOrcField_union() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("union").type().unionOf().intType().and().booleanType().endUnion().noDefault();
Schema testSchema = builder.endRecord();
TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("union").schema());
assertEquals(
TypeInfoFactory.getUnionTypeInfo(Arrays.asList(
TypeInfoCreator.createInt(),
TypeInfoCreator.createBoolean())),
orcType);
}
@Test
public void test_getOrcField_map() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("map").type().map().values().doubleType().noDefault();
Schema testSchema = builder.endRecord();
TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("map").schema());
assertEquals(
TypeInfoFactory.getMapTypeInfo(
TypeInfoCreator.createString(),
TypeInfoCreator.createDouble()),
orcType);
}
@Test
public void test_getOrcField_nested_map() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("map").type().map().values().map().values().doubleType().noDefault();
Schema testSchema = builder.endRecord();
TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("map").schema());
assertEquals(
TypeInfoFactory.getMapTypeInfo(TypeInfoCreator.createString(),
TypeInfoFactory.getMapTypeInfo(TypeInfoCreator.createString(), TypeInfoCreator.createDouble())),
orcType);
}
@Test
public void test_getOrcField_array() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("array").type().array().items().longType().noDefault();
Schema testSchema = builder.endRecord();
TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("array").schema());
assertEquals(
TypeInfoFactory.getListTypeInfo(TypeInfoCreator.createLong()),
orcType);
}
@Test
public void test_getOrcField_complex_array() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("array").type().array().items().map().values().floatType().noDefault();
Schema testSchema = builder.endRecord();
TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("array").schema());
assertEquals(
TypeInfoFactory.getListTypeInfo(TypeInfoFactory.getMapTypeInfo(TypeInfoCreator.createString(), TypeInfoCreator.createFloat())),
orcType);
}
@Test
public void test_getOrcField_record() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("int").type().intType().noDefault();
builder.name("long").type().longType().longDefault(1L);
builder.name("array").type().array().items().stringType().noDefault();
Schema testSchema = builder.endRecord();
TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema);
assertEquals(
TypeInfoFactory.getStructTypeInfo(
Arrays.asList("int", "long", "array"),
Arrays.asList(
TypeInfoCreator.createInt(),
TypeInfoCreator.createLong(),
TypeInfoFactory.getListTypeInfo(TypeInfoCreator.createString()))),
orcType);
}
@Test
public void test_getOrcField_enum() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("enumField").type().enumeration("enum").symbols("a", "b", "c").enumDefault("a");
Schema testSchema = builder.endRecord();
TypeInfo orcType = NiFiOrcUtils.getOrcField(testSchema.getField("enumField").schema());
assertEquals(TypeInfoCreator.createString(), orcType);
}
@Test
public void test_getPrimitiveOrcTypeFromPrimitiveAvroType() throws Exception {
// Expected ORC types
TypeInfo[] expectedTypes = {
TypeInfoCreator.createInt(),
TypeInfoCreator.createLong(),
TypeInfoCreator.createBoolean(),
TypeInfoCreator.createFloat(),
TypeInfoCreator.createDouble(),
TypeInfoCreator.createBinary(),
TypeInfoCreator.createString(),
};
Schema testSchema = buildPrimitiveAvroSchema();
List<Schema.Field> fields = testSchema.getFields();
for (int i = 0; i < fields.size(); i++) {
assertEquals(expectedTypes[i], NiFiOrcUtils.getPrimitiveOrcTypeFromPrimitiveAvroType(fields.get(i).schema().getType()));
}
}
@Test(expected = IllegalArgumentException.class)
public void test_getPrimitiveOrcTypeFromPrimitiveAvroType_badType() throws Exception {
Schema.Type nonPrimitiveType = Schema.Type.ARRAY;
NiFiOrcUtils.getPrimitiveOrcTypeFromPrimitiveAvroType(nonPrimitiveType);
}
@Test
public void test_getWritable() throws Exception {
assertTrue(NiFiOrcUtils.convertToORCObject(null, 1) instanceof IntWritable);
assertTrue(NiFiOrcUtils.convertToORCObject(null, 1L) instanceof LongWritable);
assertTrue(NiFiOrcUtils.convertToORCObject(null, 1.0f) instanceof FloatWritable);
assertTrue(NiFiOrcUtils.convertToORCObject(null, 1.0) instanceof DoubleWritable);
assertTrue(NiFiOrcUtils.convertToORCObject(null, new int[]{1, 2, 3}) instanceof List);
assertTrue(NiFiOrcUtils.convertToORCObject(null, Arrays.asList(1, 2, 3)) instanceof List);
Map<String, Float> map = new HashMap<>();
map.put("Hello", 1.0f);
map.put("World", 2.0f);
Object writable = NiFiOrcUtils.convertToORCObject(TypeInfoUtils.getTypeInfoFromTypeString("map<string,float>"), map);
assertTrue(writable instanceof MapWritable);
MapWritable mapWritable = (MapWritable) writable;
mapWritable.forEach((key, value) -> {
assertTrue(key instanceof Text);
assertTrue(value instanceof FloatWritable);
});
}
@Test
public void test_getHiveTypeFromAvroType_primitive() throws Exception {
// Expected ORC types
String[] expectedTypes = {
"INT",
"BIGINT",
"BOOLEAN",
"FLOAT",
"DOUBLE",
"BINARY",
"STRING",
};
Schema testSchema = buildPrimitiveAvroSchema();
List<Schema.Field> fields = testSchema.getFields();
for (int i = 0; i < fields.size(); i++) {
assertEquals(expectedTypes[i], NiFiOrcUtils.getHiveTypeFromAvroType(fields.get(i).schema()));
}
}
@Test
public void test_getHiveTypeFromAvroType_complex() throws Exception {
// Expected ORC types
String[] expectedTypes = {
"INT",
"MAP<STRING, DOUBLE>",
"STRING",
"UNIONTYPE<BIGINT, FLOAT>",
"ARRAY<INT>"
};
Schema testSchema = buildComplexAvroSchema();
List<Schema.Field> fields = testSchema.getFields();
for (int i = 0; i < fields.size(); i++) {
assertEquals(expectedTypes[i], NiFiOrcUtils.getHiveTypeFromAvroType(fields.get(i).schema()));
}
assertEquals("STRUCT<myInt:INT, myMap:MAP<STRING, DOUBLE>, myEnum:STRING, myLongOrFloat:UNIONTYPE<BIGINT, FLOAT>, myIntList:ARRAY<INT>>",
NiFiOrcUtils.getHiveTypeFromAvroType(testSchema));
}
@Test
public void test_generateHiveDDL_primitive() throws Exception {
Schema avroSchema = buildPrimitiveAvroSchema();
String ddl = NiFiOrcUtils.generateHiveDDL(avroSchema, "myHiveTable");
assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS myHiveTable (int INT, long BIGINT, boolean BOOLEAN, float FLOAT, double DOUBLE, bytes BINARY, string STRING)"
+ " STORED AS ORC", ddl);
}
@Test
public void test_generateHiveDDL_complex() throws Exception {
Schema avroSchema = buildComplexAvroSchema();
String ddl = NiFiOrcUtils.generateHiveDDL(avroSchema, "myHiveTable");
assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS myHiveTable "
+ "(myInt INT, myMap MAP<STRING, DOUBLE>, myEnum STRING, myLongOrFloat UNIONTYPE<BIGINT, FLOAT>, myIntList ARRAY<INT>)"
+ " STORED AS ORC", ddl);
}
//////////////////
// Helper methods
//////////////////
public static Schema buildPrimitiveAvroSchema() {
// Build a fake Avro record with all primitive types
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("test.record").namespace("any.data").fields();
builder.name("int").type().intType().noDefault();
builder.name("long").type().longType().longDefault(1L);
builder.name("boolean").type().booleanType().booleanDefault(true);
builder.name("float").type().floatType().floatDefault(0.0f);
builder.name("double").type().doubleType().doubleDefault(0.0);
builder.name("bytes").type().bytesType().noDefault();
builder.name("string").type().stringType().stringDefault("default");
return builder.endRecord();
}
public static GenericData.Record buildPrimitiveAvroRecord(int i, long l, boolean b, float f, double d, ByteBuffer bytes, String string) {
Schema schema = buildPrimitiveAvroSchema();
GenericData.Record row = new GenericData.Record(schema);
row.put("int", i);
row.put("long", l);
row.put("boolean", b);
row.put("float", f);
row.put("double", d);
row.put("bytes", bytes);
row.put("string", string);
return row;
}
public static TypeInfo buildPrimitiveOrcSchema() {
return TypeInfoFactory.getStructTypeInfo(Arrays.asList("int", "long", "boolean", "float", "double", "bytes", "string"),
Arrays.asList(
TypeInfoCreator.createInt(),
TypeInfoCreator.createLong(),
TypeInfoCreator.createBoolean(),
TypeInfoCreator.createFloat(),
TypeInfoCreator.createDouble(),
TypeInfoCreator.createBinary(),
TypeInfoCreator.createString()));
}
public static Schema buildComplexAvroSchema() {
// Build a fake Avro record with nested types
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("complex.record").namespace("any.data").fields();
builder.name("myInt").type().unionOf().nullType().and().intType().endUnion().nullDefault();
builder.name("myMap").type().map().values().doubleType().noDefault();
builder.name("myEnum").type().enumeration("myEnum").symbols("ABC", "DEF", "XYZ").enumDefault("ABC");
builder.name("myLongOrFloat").type().unionOf().longType().and().floatType().endUnion().noDefault();
builder.name("myIntList").type().array().items().intType().noDefault();
return builder.endRecord();
}
public static GenericData.Record buildComplexAvroRecord(Integer i, Map<String, Double> m, String e, Object unionVal, List<Integer> intArray) {
Schema schema = buildComplexAvroSchema();
GenericData.Record row = new GenericData.Record(schema);
row.put("myInt", i);
row.put("myMap", m);
row.put("myEnum", e);
row.put("myLongOrFloat", unionVal);
row.put("myIntList", intArray);
return row;
}
public static TypeInfo buildComplexOrcSchema() {
return TypeInfoUtils.getTypeInfoFromTypeString("struct<myInt:int,myMap:map<string,double>,myEnum:string,myLongOrFloat:uniontype<int>,myIntList:array<int>>");
}
private static class TypeInfoCreator {
static TypeInfo createInt() {
return TypeInfoFactory.getPrimitiveTypeInfo("int");
}
static TypeInfo createLong() {
return TypeInfoFactory.getPrimitiveTypeInfo("bigint");
}
static TypeInfo createBoolean() {
return TypeInfoFactory.getPrimitiveTypeInfo("boolean");
}
static TypeInfo createFloat() {
return TypeInfoFactory.getPrimitiveTypeInfo("float");
}
static TypeInfo createDouble() {
return TypeInfoFactory.getPrimitiveTypeInfo("double");
}
static TypeInfo createBinary() {
return TypeInfoFactory.getPrimitiveTypeInfo("binary");
}
static TypeInfo createString() {
return TypeInfoFactory.getPrimitiveTypeInfo("string");
}
}
}