NIFI-1663: Add ConvertAvroToORC processor

- Code review changes
 - This closes #477.
This commit is contained in:
Matt Burgess 2016-06-15 09:12:57 -04:00 committed by Mark Payne
parent f43f47694c
commit d6391652e0
10 changed files with 4548 additions and 46 deletions

View File

@ -84,6 +84,18 @@ The following binary components are provided under the Apache Software License v
This project includes software copyrighted by Dell SecureWorks and This project includes software copyrighted by Dell SecureWorks and
licensed under the Apache License, Version 2.0. licensed under the Apache License, Version 2.0.
(ASLv2) Apache ORC
The following NOTICE information applies:
Apache ORC
Copyright 2013-2015 The Apache Software Foundation
This product includes software developed by The Apache Software
Foundation (http://www.apache.org/).
This product includes software developed by Hewlett-Packard:
(c) Copyright [2014-2015] Hewlett-Packard Development Company, L.P
(ASLv2) Jackson JSON processor (ASLv2) Jackson JSON processor
The following NOTICE information applies: The following NOTICE information applies:
# Jackson JSON processor # Jackson JSON processor

View File

@ -27,6 +27,7 @@
<properties> <properties>
<hive.version>2.0.0</hive.version> <hive.version>2.0.0</hive.version>
<orc.version>1.1.1</orc.version>
</properties> </properties>
@ -44,6 +45,10 @@
<artifactId>hive-jdbc</artifactId> <artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version> <version>${hive.version}</version>
<exclusions> <exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-orc</artifactId>
</exclusion>
<exclusion> <exclusion>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId> <artifactId>hadoop-common</artifactId>
@ -130,6 +135,11 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>${orc.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId> <artifactId>hadoop-common</artifactId>

View File

@ -17,10 +17,7 @@
package org.apache.nifi.dbcp.hive; package org.apache.nifi.dbcp.hive;
import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.dbcp.BasicDataSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.jdbc.HiveDriver; import org.apache.hive.jdbc.HiveDriver;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -30,7 +27,6 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.KerberosProperties;
@ -40,8 +36,8 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.hive.HiveJdbcCommon;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.sql.Connection; import java.sql.Connection;
@ -78,7 +74,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
.description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop " .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
+ "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication " + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
+ "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.") + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
.required(false).addValidator(createMultipleFilesExistValidator()).build(); .required(false).addValidator(HiveJdbcCommon.createMultipleFilesExistValidator()).build();
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder() public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
.name("hive-db-user") .name("hive-db-user")
@ -170,7 +166,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
// then load the Configuration and set the new resources in the holder // then load the Configuration and set the new resources in the holder
if (resources == null || !configFiles.equals(resources.getConfigResources())) { if (resources == null || !configFiles.equals(resources.getConfigResources())) {
getLogger().debug("Reloading validation resources"); getLogger().debug("Reloading validation resources");
resources = new ValidationResources(configFiles, getConfigurationFromFiles(configFiles)); resources = new ValidationResources(configFiles, HiveJdbcCommon.getConfigurationFromFiles(configFiles));
validationResourceHolder.set(resources); validationResourceHolder.set(resources);
} }
@ -185,16 +181,6 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
return problems; return problems;
} }
protected Configuration getConfigurationFromFiles(final String configFiles) {
final Configuration hiveConfig = new HiveConf();
if (StringUtils.isNotBlank(configFiles)) {
for (final String configFile : configFiles.split(",")) {
hiveConfig.addResource(new Path(configFile.trim()));
}
}
return hiveConfig;
}
/** /**
* Configures connection pool by creating an instance of the * Configures connection pool by creating an instance of the
* {@link BasicDataSource} based on configuration provided with * {@link BasicDataSource} based on configuration provided with
@ -213,7 +199,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
connectionUrl = context.getProperty(DATABASE_URL).getValue(); connectionUrl = context.getProperty(DATABASE_URL).getValue();
final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue(); final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
final Configuration hiveConfig = getConfigurationFromFiles(configFiles); final Configuration hiveConfig = HiveJdbcCommon.getConfigurationFromFiles(configFiles);
// add any dynamic properties to the Hive configuration // add any dynamic properties to the Hive configuration
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
@ -299,34 +285,6 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
return "HiveConnectionPool[id=" + getIdentifier() + "]"; return "HiveConnectionPool[id=" + getIdentifier() + "]";
} }
/**
* Validates that one or more files exist, as specified in a single property.
*/
public static Validator createMultipleFilesExistValidator() {
return new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
final String[] files = input.split(",");
for (String filename : files) {
try {
final File file = new File(filename.trim());
final boolean valid = file.exists() && file.isFile();
if (!valid) {
final String message = "File " + file + " does not exist or is not a file";
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
}
} catch (SecurityException e) {
final String message = "Unable to access " + filename + " due to " + e.getMessage();
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
}
}
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
}
};
}
@Override @Override
public String getConnectionURL() { public String getConnectionURL() {
return connectionUrl; return connectionUrl;

View File

@ -0,0 +1,309 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hive;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.hive.HiveJdbcCommon;
import org.apache.nifi.util.orc.OrcFlowFileWriter;
import org.apache.nifi.util.orc.OrcUtils;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* The ConvertAvroToORC processor takes an Avro-formatted flow file as input and converts it into ORC format.
*/
@SideEffectFree
@SupportsBatching
@Tags({"avro", "orc", "hive", "convert"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Converts an Avro record into ORC file format. This processor provides a direct mapping of an Avro record to an ORC record, such "
+ "that the resulting ORC file will have the same hierarchical structure as the Avro document. If an incoming FlowFile contains a stream of "
+ "multiple Avro records, the resultant FlowFile will contain a ORC file containing all of the Avro records. If an incoming FlowFile does "
+ "not contain any records, an empty ORC file is the output.")
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/octet-stream"),
@WritesAttribute(attribute = "filename", description = "Sets the filename to the existing filename with the extension replaced by / added to by .orc"),
@WritesAttribute(attribute = "record.count", description = "Sets the number of records in the ORC file."),
@WritesAttribute(attribute = "hive.ddl", description = "Creates a partial Hive DDL statement for creating a table in Hive from this ORC file. "
+ "This can be used in ReplaceText for setting the content to the DDL. To make it valid DDL, add \"LOCATION '<path_to_orc_file_in_hdfs>'\", where "
+ "the path is the directory that contains this ORC file on HDFS. For example, ConvertAvroToORC can send flow files to a PutHDFS processor to send the file to "
+ "HDFS, then to a ReplaceText to set the content to this DDL (plus the LOCATION clause as described), then to PutHiveQL processor to create the table "
+ "if it doesn't exist.")
})
public class ConvertAvroToORC extends AbstractProcessor {
// Attributes
public static final String ORC_MIME_TYPE = "application/octet-stream";
public static final String HIVE_DDL_ATTRIBUTE = "hive.ddl";
public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
// Properties
public static final PropertyDescriptor ORC_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
.name("orc-config-resources")
.displayName("ORC Configuration Resources")
.description("A file or comma separated list of files which contains the ORC configuration (hive-site.xml, e.g.). Without this, Hadoop "
+ "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Please see the ORC documentation for more details.")
.required(false).addValidator(HiveJdbcCommon.createMultipleFilesExistValidator()).build();
public static final PropertyDescriptor STRIPE_SIZE = new PropertyDescriptor.Builder()
.name("orc-stripe-size")
.displayName("Stripe Size")
.description("The size of the memory buffer (in bytes) for writing stripes to an ORC file")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("100 KB")
.build();
public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("orc-buffer-size")
.displayName("Buffer Size")
.description("The maximum size of the memory buffers (in bytes) used for compressing and storing a stripe in memory. This is a hint to the ORC writer, "
+ "which may choose to use a smaller buffer size based on stripe size and number of columns for efficient stripe writing and memory utilization.")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("10 KB")
.build();
public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
.name("orc-compression-type")
.displayName("Compression Type")
.required(true)
.allowableValues("NONE", "ZLIB", "SNAPPY", "LZO")
.defaultValue("NONE")
.build();
public static final PropertyDescriptor HIVE_TABLE_NAME = new PropertyDescriptor.Builder()
.name("orc-hive-table-name")
.displayName("Hive Table Name")
.description("An optional table name to insert into the hive.ddl attribute. The generated DDL can be used by "
+ "a PutHiveQL processor (presumably after a PutHDFS processor) to create a table backed by the converted ORC file. "
+ "If this property is not provided, the full name (including namespace) of the incoming Avro record will be normalized "
+ "and used as the table name.")
.required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
// Relationships
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after it has been converted to ORC format.")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to ORC for any reason")
.build();
private final static List<PropertyDescriptor> propertyDescriptors;
private final static Set<Relationship> relationships;
private volatile Configuration orcConfig;
/*
* Will ensure that the list of property descriptors is built only once.
* Will also create a Set of relationships
*/
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(ORC_CONFIGURATION_RESOURCES);
_propertyDescriptors.add(STRIPE_SIZE);
_propertyDescriptors.add(BUFFER_SIZE);
_propertyDescriptors.add(COMPRESSION_TYPE);
_propertyDescriptors.add(HIVE_TABLE_NAME);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_relationships);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@OnScheduled
public void setup(ProcessContext context) {
boolean confFileProvided = context.getProperty(ORC_CONFIGURATION_RESOURCES).isSet();
if (confFileProvided) {
final String configFiles = context.getProperty(ORC_CONFIGURATION_RESOURCES).getValue();
orcConfig = HiveJdbcCommon.getConfigurationFromFiles(configFiles);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
try {
long startTime = System.currentTimeMillis();
final long stripeSize = context.getProperty(STRIPE_SIZE).asDataSize(DataUnit.B).longValue();
final int bufferSize = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final CompressionKind compressionType = CompressionKind.valueOf(context.getProperty(COMPRESSION_TYPE).getValue());
final AtomicReference<Schema> hiveAvroSchema = new AtomicReference<>(null);
final AtomicInteger totalRecordCount = new AtomicInteger(0);
final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn);
final OutputStream out = new BufferedOutputStream(rawOut);
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<>())) {
// Create ORC schema from Avro schema
Schema avroSchema = reader.getSchema();
TypeDescription orcSchema = OrcUtils.getOrcField(avroSchema);
if (orcConfig == null) {
orcConfig = new Configuration();
}
OrcFile.WriterOptions options = OrcFile.writerOptions(orcConfig)
.setSchema(orcSchema)
.stripeSize(stripeSize)
.bufferSize(bufferSize)
.compress(compressionType)
.version(OrcFile.Version.CURRENT);
OrcFlowFileWriter orcWriter = new OrcFlowFileWriter(out, new Path(fileName), options);
try {
VectorizedRowBatch batch = orcSchema.createRowBatch();
int recordCount = 0;
int recordsInBatch = 0;
GenericRecord currRecord = null;
while (reader.hasNext()) {
currRecord = reader.next(currRecord);
List<Schema.Field> fields = currRecord.getSchema().getFields();
if (fields != null) {
MutableInt[] vectorOffsets = new MutableInt[fields.size()];
for (int i = 0; i < fields.size(); i++) {
vectorOffsets[i] = new MutableInt(0);
Schema.Field field = fields.get(i);
Schema fieldSchema = field.schema();
Object o = currRecord.get(field.name());
try {
OrcUtils.putToRowBatch(batch.cols[i], vectorOffsets[i], recordsInBatch, fieldSchema, o);
} catch (ArrayIndexOutOfBoundsException aioobe) {
getLogger().error("Index out of bounds at record {} for column {}, type {}, and object {}",
new Object[]{recordsInBatch, i, fieldSchema.getType().getName(), o.toString()},
aioobe);
throw new IOException(aioobe);
}
}
}
recordCount++;
recordsInBatch++;
if (recordsInBatch == batch.getMaxSize()) {
// add batch and start a new one
batch.size = recordsInBatch;
orcWriter.addRowBatch(batch);
batch = orcSchema.createRowBatch();
recordsInBatch = 0;
}
}
// If there are records in the batch, add the batch
if (recordsInBatch > 0) {
batch.size = recordsInBatch;
orcWriter.addRowBatch(batch);
}
hiveAvroSchema.set(avroSchema);
totalRecordCount.set(recordCount);
} finally {
// finished writing this record, close the writer (which will flush to the flow file)
orcWriter.close();
}
}
}
});
final String hiveTableName = context.getProperty(HIVE_TABLE_NAME).isSet()
? context.getProperty(HIVE_TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue()
: OrcUtils.normalizeHiveTableName(hiveAvroSchema.get().getFullName());
String hiveDDL = OrcUtils.generateHiveDDL(hiveAvroSchema.get(), hiveTableName);
// Add attributes and transfer to success
flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTRIBUTE, Integer.toString(totalRecordCount.get()));
flowFile = session.putAttribute(flowFile, HIVE_DDL_ATTRIBUTE, hiveDDL);
StringBuilder newFilename = new StringBuilder();
int extensionIndex = fileName.lastIndexOf(".");
if (extensionIndex != -1) {
newFilename.append(fileName.substring(0, extensionIndex));
} else {
newFilename.append(fileName);
}
newFilename.append(".orc");
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), ORC_MIME_TYPE);
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename.toString());
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(flowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime);
} catch (final ProcessException pe) {
getLogger().error("Failed to convert {} from Avro to ORC due to {}; transferring to failure", new Object[]{flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
}
}
}

View File

@ -26,7 +26,14 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter; import org.apache.avro.io.DatumWriter;
import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.math.BigDecimal; import java.math.BigDecimal;
@ -327,4 +334,42 @@ public class HiveJdbcCommon {
public interface ResultSetRowCallback { public interface ResultSetRowCallback {
void processRow(ResultSet resultSet) throws IOException; void processRow(ResultSet resultSet) throws IOException;
} }
/**
* Validates that one or more files exist, as specified in a single property.
*/
public static Validator createMultipleFilesExistValidator() {
return new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
final String[] files = input.split(",");
for (String filename : files) {
try {
final File file = new File(filename.trim());
final boolean valid = file.exists() && file.isFile();
if (!valid) {
final String message = "File " + file + " does not exist or is not a file";
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
}
} catch (SecurityException e) {
final String message = "Unable to access " + filename + " due to " + e.getMessage();
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build();
}
}
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
}
};
}
public static Configuration getConfigurationFromFiles(final String configFiles) {
final Configuration hiveConfig = new HiveConf();
if (StringUtils.isNotBlank(configFiles)) {
for (final String configFile : configFiles.split(",")) {
hiveConfig.addResource(new Path(configFile.trim()));
}
}
return hiveConfig;
}
} }

View File

@ -0,0 +1,408 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util.orc;
import org.apache.avro.Schema;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.orc.TypeDescription;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Utility methods for ORC support (conversion from Avro, conversion to Hive types, e.g.
*/
public class OrcUtils {
public static void putToRowBatch(ColumnVector col, MutableInt vectorOffset, int rowNumber, Schema fieldSchema, Object o) {
Schema.Type fieldType = fieldSchema.getType();
if (fieldType == null) {
throw new IllegalArgumentException("Field type is null");
}
if (o == null) {
col.isNull[rowNumber] = true;
} else {
switch (fieldType) {
case INT:
((LongColumnVector) col).vector[rowNumber] = (int) o;
break;
case LONG:
((LongColumnVector) col).vector[rowNumber] = (long) o;
break;
case BOOLEAN:
((LongColumnVector) col).vector[rowNumber] = ((boolean) o) ? 1 : 0;
break;
case BYTES:
ByteBuffer byteBuffer = ((ByteBuffer) o);
int size = byteBuffer.remaining();
byte[] buf = new byte[size];
byteBuffer.get(buf, 0, size);
((BytesColumnVector) col).setVal(rowNumber, buf);
break;
case DOUBLE:
((DoubleColumnVector) col).vector[rowNumber] = (double) o;
break;
case FLOAT:
((DoubleColumnVector) col).vector[rowNumber] = (float) o;
break;
case STRING:
case ENUM:
((BytesColumnVector) col).setVal(rowNumber, o.toString().getBytes());
break;
case UNION:
// If the union only has one non-null type in it, it was flattened in the ORC schema
if (col instanceof UnionColumnVector) {
UnionColumnVector union = ((UnionColumnVector) col);
Schema.Type avroType = OrcUtils.getAvroSchemaTypeOfObject(o);
// Find the index in the union with the matching Avro type
int unionIndex = -1;
List<Schema> types = fieldSchema.getTypes();
final int numFields = types.size();
for (int i = 0; i < numFields && unionIndex == -1; i++) {
if (avroType.equals(types.get(i).getType())) {
unionIndex = i;
}
}
if (unionIndex == -1) {
throw new IllegalArgumentException("Object type " + avroType.getName() + " not found in union '" + fieldSchema.getName() + "'");
}
// Need nested vector offsets
MutableInt unionVectorOffset = new MutableInt(0);
putToRowBatch(union.fields[unionIndex], unionVectorOffset, rowNumber, fieldSchema.getTypes().get(unionIndex), o);
} else {
// Find and use the non-null type from the union
List<Schema> types = fieldSchema.getTypes();
Schema effectiveType = null;
for (Schema type : types) {
if (!Schema.Type.NULL.equals(type.getType())) {
effectiveType = type;
break;
}
}
putToRowBatch(col, vectorOffset, rowNumber, effectiveType, o);
}
break;
case ARRAY:
Schema arrayType = fieldSchema.getElementType();
ListColumnVector array = ((ListColumnVector) col);
if (o instanceof int[] || o instanceof long[]) {
int length = (o instanceof int[]) ? ((int[]) o).length : ((long[]) o).length;
for (int i = 0; i < length; i++) {
((LongColumnVector) array.child).vector[vectorOffset.getValue() + i] =
(o instanceof int[]) ? ((int[]) o)[i] : ((long[]) o)[i];
}
array.offsets[rowNumber] = vectorOffset.longValue();
array.lengths[rowNumber] = length;
vectorOffset.add(length);
} else if (o instanceof float[]) {
float[] floatArray = (float[]) o;
for (int i = 0; i < floatArray.length; i++) {
((DoubleColumnVector) array.child).vector[vectorOffset.getValue() + i] = floatArray[i];
}
array.offsets[rowNumber] = vectorOffset.longValue();
array.lengths[rowNumber] = floatArray.length;
vectorOffset.add(floatArray.length);
} else if (o instanceof double[]) {
double[] doubleArray = (double[]) o;
for (int i = 0; i < doubleArray.length; i++) {
((DoubleColumnVector) array.child).vector[vectorOffset.getValue() + i] = doubleArray[i];
}
array.offsets[rowNumber] = vectorOffset.longValue();
array.lengths[rowNumber] = doubleArray.length;
vectorOffset.add(doubleArray.length);
} else if (o instanceof String[]) {
String[] stringArray = (String[]) o;
BytesColumnVector byteCol = ((BytesColumnVector) array.child);
for (int i = 0; i < stringArray.length; i++) {
if (stringArray[i] == null) {
byteCol.isNull[rowNumber] = true;
} else {
byteCol.setVal(vectorOffset.getValue() + i, stringArray[i].getBytes());
}
}
array.offsets[rowNumber] = vectorOffset.longValue();
array.lengths[rowNumber] = stringArray.length;
vectorOffset.add(stringArray.length);
} else if (o instanceof Map[]) {
Map[] mapArray = (Map[]) o;
MutableInt mapVectorOffset = new MutableInt(0);
for (int i = 0; i < mapArray.length; i++) {
if (mapArray[i] == null) {
array.child.isNull[rowNumber] = true;
} else {
putToRowBatch(array.child, mapVectorOffset, vectorOffset.getValue() + i, arrayType, mapArray[i]);
}
}
array.offsets[rowNumber] = vectorOffset.longValue();
array.lengths[rowNumber] = mapArray.length;
vectorOffset.add(mapArray.length);
} else if (o instanceof List) {
List listArray = (List) o;
MutableInt listVectorOffset = new MutableInt(0);
int numElements = listArray.size();
for (int i = 0; i < numElements; i++) {
if (listArray.get(i) == null) {
array.child.isNull[rowNumber] = true;
} else {
putToRowBatch(array.child, listVectorOffset, vectorOffset.getValue() + i, arrayType, listArray.get(i));
}
}
array.offsets[rowNumber] = vectorOffset.longValue();
array.lengths[rowNumber] = numElements;
vectorOffset.add(numElements);
} else {
throw new IllegalArgumentException("Object class " + o.getClass().getName() + " not supported as an ORC list/array");
}
break;
case MAP:
MapColumnVector map = ((MapColumnVector) col);
// Avro maps require String keys
@SuppressWarnings("unchecked")
Map<String, ?> mapObj = (Map<String, ?>) o;
int effectiveRowNumber = vectorOffset.getValue();
for (Map.Entry<String, ?> entry : mapObj.entrySet()) {
putToRowBatch(map.keys, vectorOffset, effectiveRowNumber, Schema.create(Schema.Type.STRING), entry.getKey());
putToRowBatch(map.values, vectorOffset, effectiveRowNumber, fieldSchema.getValueType(), entry.getValue());
effectiveRowNumber++;
}
map.offsets[rowNumber] = vectorOffset.longValue();
map.lengths[rowNumber] = mapObj.size();
vectorOffset.add(mapObj.size());
break;
default:
throw new IllegalArgumentException("Field type " + fieldType.getName() + " not recognized");
}
}
}
public static String normalizeHiveTableName(String name) {
return name.replaceAll("[\\. ]", "_");
}
public static String generateHiveDDL(Schema avroSchema, String tableName) {
Schema.Type schemaType = avroSchema.getType();
StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS ");
sb.append(tableName);
sb.append(" (");
if (Schema.Type.RECORD.equals(schemaType)) {
List<String> hiveColumns = new ArrayList<>();
List<Schema.Field> fields = avroSchema.getFields();
if (fields != null) {
hiveColumns.addAll(
fields.stream().map(field -> field.name() + " " + getHiveTypeFromAvroType(field.schema())).collect(Collectors.toList()));
}
sb.append(StringUtils.join(hiveColumns, ", "));
sb.append(") STORED AS ORC");
return sb.toString();
} else {
throw new IllegalArgumentException("Avro schema is of type " + schemaType.getName() + ", not RECORD");
}
}
public static void addOrcField(TypeDescription orcSchema, Schema.Field avroField) {
Schema fieldSchema = avroField.schema();
String fieldName = avroField.name();
orcSchema.addField(fieldName, getOrcField(fieldSchema));
}
public static TypeDescription getOrcField(Schema fieldSchema) throws IllegalArgumentException {
Schema.Type fieldType = fieldSchema.getType();
switch (fieldType) {
case INT:
case LONG:
case BOOLEAN:
case BYTES:
case DOUBLE:
case FLOAT:
case STRING:
return getPrimitiveOrcTypeFromPrimitiveAvroType(fieldType);
case UNION:
List<Schema> unionFieldSchemas = fieldSchema.getTypes();
TypeDescription unionSchema = TypeDescription.createUnion();
if (unionFieldSchemas != null) {
// Ignore null types in union
List<TypeDescription> orcFields = unionFieldSchemas.stream().filter(
unionFieldSchema -> !Schema.Type.NULL.equals(unionFieldSchema.getType())).map(OrcUtils::getOrcField).collect(Collectors.toList());
// Flatten the field if the union only has one non-null element
if (orcFields.size() == 1) {
return orcFields.get(0);
} else {
orcFields.forEach(unionSchema::addUnionChild);
}
}
return unionSchema;
case ARRAY:
return TypeDescription.createList(getOrcField(fieldSchema.getElementType()));
case MAP:
return TypeDescription.createMap(TypeDescription.createString(), getOrcField(fieldSchema.getValueType()));
case RECORD:
TypeDescription record = TypeDescription.createStruct();
List<Schema.Field> avroFields = fieldSchema.getFields();
if (avroFields != null) {
avroFields.forEach(avroField -> addOrcField(record, avroField));
}
return record;
case ENUM:
// An enum value is just a String for ORC/Hive
return TypeDescription.createString();
default:
throw new IllegalArgumentException("Did not recognize Avro type " + fieldType.getName());
}
}
public static Schema.Type getAvroSchemaTypeOfObject(Object o) {
if (o == null) {
return Schema.Type.NULL;
} else if (o instanceof Integer) {
return Schema.Type.INT;
} else if (o instanceof Long) {
return Schema.Type.LONG;
} else if (o instanceof Boolean) {
return Schema.Type.BOOLEAN;
} else if (o instanceof byte[]) {
return Schema.Type.BYTES;
} else if (o instanceof Float) {
return Schema.Type.FLOAT;
} else if (o instanceof Double) {
return Schema.Type.DOUBLE;
} else if (o instanceof Enum) {
return Schema.Type.ENUM;
} else if (o instanceof Object[]) {
return Schema.Type.ARRAY;
} else if (o instanceof List) {
return Schema.Type.ARRAY;
} else if (o instanceof Map) {
return Schema.Type.MAP;
} else {
throw new IllegalArgumentException("Object of class " + o.getClass() + " is not a supported Avro Type");
}
}
public static TypeDescription getPrimitiveOrcTypeFromPrimitiveAvroType(Schema.Type avroType) throws IllegalArgumentException {
if (avroType == null) {
throw new IllegalArgumentException("Avro type is null");
}
switch (avroType) {
case INT:
return TypeDescription.createInt();
case LONG:
return TypeDescription.createLong();
case BOOLEAN:
return TypeDescription.createBoolean();
case BYTES:
return TypeDescription.createBinary();
case DOUBLE:
return TypeDescription.createDouble();
case FLOAT:
return TypeDescription.createFloat();
case STRING:
return TypeDescription.createString();
default:
throw new IllegalArgumentException("Avro type " + avroType.getName() + " is not a primitive type");
}
}
public static String getHiveTypeFromAvroType(Schema avroSchema) {
if (avroSchema == null) {
throw new IllegalArgumentException("Avro schema is null");
}
Schema.Type avroType = avroSchema.getType();
switch (avroType) {
case INT:
return "INT";
case LONG:
return "BIGINT";
case BOOLEAN:
return "BOOLEAN";
case BYTES:
return "BINARY";
case DOUBLE:
return "DOUBLE";
case FLOAT:
return "FLOAT";
case STRING:
case ENUM:
return "STRING";
case UNION:
List<Schema> unionFieldSchemas = avroSchema.getTypes();
if (unionFieldSchemas != null) {
List<String> hiveFields = new ArrayList<>();
for (Schema unionFieldSchema : unionFieldSchemas) {
Schema.Type unionFieldSchemaType = unionFieldSchema.getType();
// Ignore null types in union
if (!Schema.Type.NULL.equals(unionFieldSchemaType)) {
hiveFields.add(getHiveTypeFromAvroType(unionFieldSchema));
}
}
// Flatten the field if the union only has one non-null element
return (hiveFields.size() == 1)
? hiveFields.get(0)
: "UNIONTYPE<" + StringUtils.join(hiveFields, ", ") + ">";
}
break;
case MAP:
return "MAP<STRING, " + getHiveTypeFromAvroType(avroSchema.getValueType()) + ">";
case ARRAY:
return "ARRAY<" + getHiveTypeFromAvroType(avroSchema.getElementType()) + ">";
case RECORD:
List<Schema.Field> recordFields = avroSchema.getFields();
if (recordFields != null) {
List<String> hiveFields = recordFields.stream().map(
recordField -> recordField.name() + ":" + getHiveTypeFromAvroType(recordField.schema())).collect(Collectors.toList());
return "STRUCT<" + StringUtils.join(hiveFields, ", ") + ">";
}
break;
default:
break;
}
throw new IllegalArgumentException("Error converting Avro type " + avroType.getName() + " to Hive type");
}
}

View File

@ -14,3 +14,4 @@
# limitations under the License. # limitations under the License.
org.apache.nifi.processors.hive.SelectHiveQL org.apache.nifi.processors.hive.SelectHiveQL
org.apache.nifi.processors.hive.PutHiveQL org.apache.nifi.processors.hive.PutHiveQL
org.apache.nifi.processors.hive.ConvertAvroToORC

View File

@ -0,0 +1,260 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.hive;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.orc.TestOrcUtils;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for ConvertAvroToORC processor
*/
public class TestConvertAvroToORC {
private ConvertAvroToORC processor;
private TestRunner runner;
@Before
public void setUp() throws Exception {
processor = new ConvertAvroToORC();
runner = TestRunners.newTestRunner(processor);
}
@Test
public void test_Setup() throws Exception {
}
@Test
public void test_onTrigger_primitive_record() throws Exception {
GenericData.Record record = TestOrcUtils.buildPrimitiveAvroRecord(10, 20L, true, 30.0f, 40, StandardCharsets.UTF_8.encode("Hello"), "World");
DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
ByteArrayOutputStream out = new ByteArrayOutputStream();
fileWriter.create(record.getSchema(), out);
fileWriter.append(record);
// Put another record in
record = TestOrcUtils.buildPrimitiveAvroRecord(1, 2L, false, 3.0f, 4L, StandardCharsets.UTF_8.encode("I am"), "another record");
fileWriter.append(record);
// And one more
record = TestOrcUtils.buildPrimitiveAvroRecord(100, 200L, true, 300.0f, 400L, StandardCharsets.UTF_8.encode("Me"), "too!");
fileWriter.append(record);
fileWriter.flush();
fileWriter.close();
out.close();
Map<String,String> attributes = new HashMap<String,String>(){{
put(CoreAttributes.FILENAME.key(), "test.avro");
}};
runner.enqueue(out.toByteArray(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
// Write the flow file out to disk, since the ORC Reader needs a path
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS test_record (int INT, long BIGINT, boolean BOOLEAN, float FLOAT, double DOUBLE, bytes BINARY, string STRING)"
+ " STORED AS ORC", resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE));
assertEquals("3", resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE));
assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
FileOutputStream fos = new FileOutputStream("target/test1.orc");
fos.write(resultContents);
fos.flush();
fos.close();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Reader reader = OrcFile.createReader(new Path("target/test1.orc"), OrcFile.readerOptions(conf).filesystem(fs));
RecordReader rows = reader.rows();
VectorizedRowBatch batch = reader.getSchema().createRowBatch();
assertTrue(rows.nextBatch(batch));
assertTrue(batch.cols[0] instanceof LongColumnVector);
assertEquals(10, ((LongColumnVector) batch.cols[0]).vector[0]);
assertEquals(1, ((LongColumnVector) batch.cols[0]).vector[1]);
assertEquals(100, ((LongColumnVector) batch.cols[0]).vector[2]);
assertTrue(batch.cols[1] instanceof LongColumnVector);
assertEquals(20, ((LongColumnVector) batch.cols[1]).vector[0]);
assertEquals(2, ((LongColumnVector) batch.cols[1]).vector[1]);
assertEquals(200, ((LongColumnVector) batch.cols[1]).vector[2]);
assertTrue(batch.cols[2] instanceof LongColumnVector);
assertEquals(1, ((LongColumnVector) batch.cols[2]).vector[0]);
assertEquals(0, ((LongColumnVector) batch.cols[2]).vector[1]);
assertEquals(1, ((LongColumnVector) batch.cols[2]).vector[2]);
assertTrue(batch.cols[3] instanceof DoubleColumnVector);
assertEquals(30.0f, ((DoubleColumnVector) batch.cols[3]).vector[0], Double.MIN_NORMAL);
assertEquals(3.0f, ((DoubleColumnVector) batch.cols[3]).vector[1], Double.MIN_NORMAL);
assertEquals(300.0f, ((DoubleColumnVector) batch.cols[3]).vector[2], Double.MIN_NORMAL);
assertTrue(batch.cols[4] instanceof DoubleColumnVector);
assertEquals(40.0f, ((DoubleColumnVector) batch.cols[4]).vector[0], Double.MIN_NORMAL);
assertEquals(4.0f, ((DoubleColumnVector) batch.cols[4]).vector[1], Double.MIN_NORMAL);
assertEquals(400.0f, ((DoubleColumnVector) batch.cols[4]).vector[2], Double.MIN_NORMAL);
assertTrue(batch.cols[5] instanceof BytesColumnVector);
assertEquals("Hello", ((BytesColumnVector) batch.cols[5]).toString(0));
assertEquals("I am", ((BytesColumnVector) batch.cols[5]).toString(1));
assertEquals("Me", ((BytesColumnVector) batch.cols[5]).toString(2));
assertTrue(batch.cols[6] instanceof BytesColumnVector);
assertEquals("World", ((BytesColumnVector) batch.cols[6]).toString(0));
assertEquals("another record", ((BytesColumnVector) batch.cols[6]).toString(1));
assertEquals("too!", ((BytesColumnVector) batch.cols[6]).toString(2));
}
@Test
public void test_onTrigger_complex_record() throws Exception {
Map<String, Double> mapData1 = new TreeMap<String, Double>() {{
put("key1", 1.0);
put("key2", 2.0);
}};
GenericData.Record record = TestOrcUtils.buildComplexAvroRecord(10, mapData1, "DEF", 3.0f, Arrays.asList(10, 20));
DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(record.getSchema());
DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
ByteArrayOutputStream out = new ByteArrayOutputStream();
fileWriter.create(record.getSchema(), out);
fileWriter.append(record);
// Put another record in
Map<String, Double> mapData2 = new TreeMap<String, Double>() {{
put("key1", 3.0);
put("key2", 4.0);
}};
record = TestOrcUtils.buildComplexAvroRecord(null, mapData2, "XYZ", 4L, Arrays.asList(100, 200));
fileWriter.append(record);
fileWriter.flush();
fileWriter.close();
out.close();
Map<String,String> attributes = new HashMap<String,String>(){{
put(CoreAttributes.FILENAME.key(), "test");
}};
runner.enqueue(out.toByteArray(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
// Write the flow file out to disk, since the ORC Reader needs a path
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS complex_record " +
"(myInt INT, myMap MAP<STRING, DOUBLE>, myEnum STRING, myLongOrFloat UNIONTYPE<BIGINT, FLOAT>, myIntList ARRAY<INT>)"
+ " STORED AS ORC", resultFlowFile.getAttribute(ConvertAvroToORC.HIVE_DDL_ATTRIBUTE));
assertEquals("2", resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE));
assertEquals("test.orc", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
FileOutputStream fos = new FileOutputStream("target/test1.orc");
fos.write(resultContents);
fos.flush();
fos.close();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Reader reader = OrcFile.createReader(new Path("target/test1.orc"), OrcFile.readerOptions(conf).filesystem(fs));
RecordReader rows = reader.rows();
VectorizedRowBatch batch = reader.getSchema().createRowBatch();
assertTrue(rows.nextBatch(batch));
assertTrue(batch.cols[0] instanceof LongColumnVector);
assertEquals(10, ((LongColumnVector) batch.cols[0]).vector[0]);
assertTrue(batch.cols[1] instanceof MapColumnVector);
assertTrue(batch.cols[2] instanceof BytesColumnVector);
assertTrue(batch.cols[3] instanceof UnionColumnVector);
StringBuilder buffer = new StringBuilder();
batch.cols[3].stringifyValue(buffer, 1);
assertEquals("{\"tag\": 0, \"value\": 4}", buffer.toString());
assertTrue(batch.cols[4] instanceof ListColumnVector);
}
@Test
public void test_onTrigger_multiple_batches() throws Exception {
Schema recordSchema = TestOrcUtils.buildPrimitiveAvroSchema();
DatumWriter<GenericData.Record> writer = new GenericDatumWriter<>(recordSchema);
DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(writer);
ByteArrayOutputStream out = new ByteArrayOutputStream();
fileWriter.create(recordSchema, out);
GenericData.Record record;
for (int i = 1;i<=2000;i++) {
record = TestOrcUtils.buildPrimitiveAvroRecord(i, 2L * i, true, 30.0f * i, 40L * i, StandardCharsets.UTF_8.encode("Hello"), "World");
fileWriter.append(record);
}
fileWriter.flush();
fileWriter.close();
out.close();
runner.enqueue(out.toByteArray());
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToORC.REL_SUCCESS, 1);
// Write the flow file out to disk, since the ORC Reader needs a path
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToORC.REL_SUCCESS).get(0);
assertEquals("2000", resultFlowFile.getAttribute(ConvertAvroToORC.RECORD_COUNT_ATTRIBUTE));
byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
FileOutputStream fos = new FileOutputStream("target/test1.orc");
fos.write(resultContents);
fos.flush();
fos.close();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Reader reader = OrcFile.createReader(new Path("target/test1.orc"), OrcFile.readerOptions(conf).filesystem(fs));
RecordReader rows = reader.rows();
VectorizedRowBatch batch = reader.getSchema().createRowBatch();
assertTrue(rows.nextBatch(batch));
// At least 2 batches were created
assertTrue(rows.nextBatch(batch));
}
}

View File

@ -0,0 +1,555 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util.orc;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.TypeDescription;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for the OrcUtils helper class
*/
public class TestOrcUtils {
@Test
public void test_getOrcField_primitive() throws Exception {
// Expected ORC types
TypeDescription[] expectedTypes = {
TypeDescription.createInt(),
TypeDescription.createLong(),
TypeDescription.createBoolean(),
TypeDescription.createFloat(),
TypeDescription.createDouble(),
TypeDescription.createBinary(),
TypeDescription.createString(),
};
// Build a fake Avro record with all types
Schema testSchema = buildPrimitiveAvroSchema();
List<Schema.Field> fields = testSchema.getFields();
for (int i = 0; i < fields.size(); i++) {
assertEquals(expectedTypes[i], OrcUtils.getOrcField(fields.get(i).schema()));
}
}
@Test
public void test_getOrcField_union_optional_type() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("union").type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
Schema testSchema = builder.endRecord();
TypeDescription orcType = OrcUtils.getOrcField(testSchema.getField("union").schema());
assertEquals(TypeDescription.createBoolean(), orcType);
}
@Test
public void test_getOrcField_union() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("union").type().unionOf().intType().and().booleanType().endUnion().noDefault();
Schema testSchema = builder.endRecord();
TypeDescription orcType = OrcUtils.getOrcField(testSchema.getField("union").schema());
assertEquals(
TypeDescription.createUnion()
.addUnionChild(TypeDescription.createInt())
.addUnionChild(TypeDescription.createBoolean()),
orcType);
}
@Test
public void test_getOrcField_map() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("map").type().map().values().doubleType().noDefault();
Schema testSchema = builder.endRecord();
TypeDescription orcType = OrcUtils.getOrcField(testSchema.getField("map").schema());
assertEquals(
TypeDescription.createMap(TypeDescription.createString(), TypeDescription.createDouble()),
orcType);
}
@Test
public void test_getOrcField_nested_map() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("map").type().map().values().map().values().doubleType().noDefault();
Schema testSchema = builder.endRecord();
TypeDescription orcType = OrcUtils.getOrcField(testSchema.getField("map").schema());
assertEquals(
TypeDescription.createMap(TypeDescription.createString(),
TypeDescription.createMap(TypeDescription.createString(), TypeDescription.createDouble())),
orcType);
}
@Test
public void test_getOrcField_array() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("array").type().array().items().longType().noDefault();
Schema testSchema = builder.endRecord();
TypeDescription orcType = OrcUtils.getOrcField(testSchema.getField("array").schema());
assertEquals(
TypeDescription.createList(TypeDescription.createLong()),
orcType);
}
@Test
public void test_getOrcField_complex_array() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("array").type().array().items().map().values().floatType().noDefault();
Schema testSchema = builder.endRecord();
TypeDescription orcType = OrcUtils.getOrcField(testSchema.getField("array").schema());
assertEquals(
TypeDescription.createList(TypeDescription.createMap(TypeDescription.createString(), TypeDescription.createFloat())),
orcType);
}
@Test
public void test_getOrcField_record() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("int").type().intType().noDefault();
builder.name("long").type().longType().longDefault(1L);
builder.name("array").type().array().items().stringType().noDefault();
Schema testSchema = builder.endRecord();
TypeDescription orcType = OrcUtils.getOrcField(testSchema);
assertEquals(
TypeDescription.createStruct()
.addField("int", TypeDescription.createInt())
.addField("long", TypeDescription.createLong())
.addField("array", TypeDescription.createList(TypeDescription.createString())),
orcType);
}
@Test
public void test_getOrcField_enum() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("enumField").type().enumeration("enum").symbols("a", "b", "c").enumDefault("a");
Schema testSchema = builder.endRecord();
TypeDescription orcType = OrcUtils.getOrcField(testSchema.getField("enumField").schema());
assertEquals(TypeDescription.createString(), orcType);
}
@Test
public void test_getPrimitiveOrcTypeFromPrimitiveAvroType() throws Exception {
// Expected ORC types
TypeDescription[] expectedTypes = {
TypeDescription.createInt(),
TypeDescription.createLong(),
TypeDescription.createBoolean(),
TypeDescription.createFloat(),
TypeDescription.createDouble(),
TypeDescription.createBinary(),
TypeDescription.createString(),
};
Schema testSchema = buildPrimitiveAvroSchema();
List<Schema.Field> fields = testSchema.getFields();
for (int i = 0; i < fields.size(); i++) {
assertEquals(expectedTypes[i], OrcUtils.getPrimitiveOrcTypeFromPrimitiveAvroType(fields.get(i).schema().getType()));
}
}
@Test(expected = IllegalArgumentException.class)
public void test_getPrimitiveOrcTypeFromPrimitiveAvroType_badType() throws Exception {
Schema.Type nonPrimitiveType = Schema.Type.ARRAY;
OrcUtils.getPrimitiveOrcTypeFromPrimitiveAvroType(nonPrimitiveType);
}
@Test
public void test_putRowToBatch() {
TypeDescription orcSchema = buildPrimitiveOrcSchema();
VectorizedRowBatch batch = orcSchema.createRowBatch();
Schema avroSchema = buildPrimitiveAvroSchema();
List<Schema.Field> fields = avroSchema.getFields();
GenericData.Record record = buildPrimitiveAvroRecord(1, 2L, false, 1.0f, 3.0, ByteBuffer.wrap("Hello".getBytes()), "World");
for (int i = 0; i < fields.size(); i++) {
OrcUtils.putToRowBatch(batch.cols[i], new MutableInt(0), 0, fields.get(i).schema(), record.get(i));
}
assertEquals(1, ((LongColumnVector) batch.cols[0]).vector[0]);
assertEquals(2, ((LongColumnVector) batch.cols[1]).vector[0]);
assertEquals(0, ((LongColumnVector) batch.cols[2]).vector[0]);
assertEquals(1.0, ((DoubleColumnVector) batch.cols[3]).vector[0], Double.MIN_NORMAL);
assertEquals(3.0, ((DoubleColumnVector) batch.cols[4]).vector[0], Double.MIN_NORMAL);
assertEquals("Hello", ((BytesColumnVector) batch.cols[5]).toString(0));
assertEquals("World", ((BytesColumnVector) batch.cols[6]).toString(0));
}
@Test
public void test_putRowToBatch_union() {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("union").type().unionOf().intType().and().floatType().endUnion().noDefault();
Schema testSchema = builder.endRecord();
GenericData.Record row = new GenericData.Record(testSchema);
row.put("union", 2);
TypeDescription orcSchema = TypeDescription.createUnion()
.addUnionChild(TypeDescription.createInt())
.addUnionChild(TypeDescription.createFloat());
VectorizedRowBatch batch = orcSchema.createRowBatch();
batch.ensureSize(2);
OrcUtils.putToRowBatch(batch.cols[0], new MutableInt(0), 0, testSchema.getField("union").schema(), row.get("union"));
UnionColumnVector union = ((UnionColumnVector) batch.cols[0]);
// verify the value is in the union field of type 'int'
assertEquals(2, ((LongColumnVector) union.fields[0]).vector[0]);
assertEquals(0.0, ((DoubleColumnVector) union.fields[1]).vector[0], Double.MIN_NORMAL);
row.put("union", 2.0f);
OrcUtils.putToRowBatch(batch.cols[0], new MutableInt(0), 1, testSchema.getField("union").schema(), row.get("union"));
union = ((UnionColumnVector) batch.cols[0]);
// verify the value is in the union field of type 'double'
assertEquals(0, ((LongColumnVector) union.fields[0]).vector[1]);
assertEquals(2.0, ((DoubleColumnVector) union.fields[1]).vector[1], Double.MIN_NORMAL);
}
@Test
public void test_putRowToBatch_optional_union() {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("union").type().unionOf().nullType().and().floatType().endUnion().noDefault();
Schema testSchema = builder.endRecord();
GenericData.Record row = new GenericData.Record(testSchema);
row.put("union", 2.0f);
TypeDescription orcSchema = TypeDescription.createFloat();
VectorizedRowBatch batch = orcSchema.createRowBatch();
batch.ensureSize(2);
OrcUtils.putToRowBatch(batch.cols[0], new MutableInt(0), 0, testSchema.getField("union").schema(), row.get("union"));
assertTrue(batch.cols[0] instanceof DoubleColumnVector);
DoubleColumnVector union = ((DoubleColumnVector) batch.cols[0]);
// verify the value is in the union field of type 'int'
assertEquals(2.0, union.vector[0], Double.MIN_NORMAL);
}
@Test
public void test_putRowToBatch_array_ints() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("array").type().array().items().intType().noDefault();
Schema testSchema = builder.endRecord();
GenericData.Record row = new GenericData.Record(testSchema);
int[] data1 = {1, 2, 3, 4, 5};
row.put("array", data1);
TypeDescription orcSchema = OrcUtils.getOrcField(testSchema.getField("array").schema());
VectorizedRowBatch batch = orcSchema.createRowBatch();
batch.ensureSize(2);
MutableInt vectorOffset = new MutableInt(0);
OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 0, testSchema.getField("array").schema(), row.get("array"));
int[] data2 = {10, 20, 30, 40};
row.put("array", data2);
OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 1, testSchema.getField("array").schema(), row.get("array"));
ListColumnVector array = ((ListColumnVector) batch.cols[0]);
LongColumnVector dataColumn = ((LongColumnVector) array.child);
// Check the first row, entries 0..4 should have values 1..5
for (int i = 0; i < 5; i++) {
assertEquals(i + 1, dataColumn.vector[i]);
}
// Check the second row, entries 5..8 should have values 10..40 (by tens)
for (int i = 0; i < 4; i++) {
assertEquals((i + 1) * 10, dataColumn.vector[(int) array.offsets[1] + i]);
}
assertEquals(0, dataColumn.vector[9]);
}
@Test
public void test_putRowToBatch_array_floats() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("array").type().array().items().floatType().noDefault();
Schema testSchema = builder.endRecord();
GenericData.Record row = new GenericData.Record(testSchema);
float[] data1 = {1.0f, 2.0f, 3.0f};
row.put("array", data1);
TypeDescription orcSchema = OrcUtils.getOrcField(testSchema.getField("array").schema());
VectorizedRowBatch batch = orcSchema.createRowBatch();
batch.ensureSize(2);
MutableInt vectorOffset = new MutableInt(0);
OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 0, testSchema.getField("array").schema(), row.get("array"));
float[] data2 = {40.0f, 41.0f, 42.0f, 43.0f};
row.put("array", data2);
OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 1, testSchema.getField("array").schema(), row.get("array"));
ListColumnVector array = ((ListColumnVector) batch.cols[0]);
DoubleColumnVector dataColumn = ((DoubleColumnVector) array.child);
// Check the first row, entries 0..4 should have values 1..5
for (int i = 0; i < 3; i++) {
assertEquals(i + 1.0f, dataColumn.vector[i], Float.MIN_NORMAL);
}
// Check the second row, entries 5..8 should have values 10..40 (by tens)
for (int i = 0; i < 4; i++) {
assertEquals((i + 40.0f), dataColumn.vector[(int) array.offsets[1] + i], Float.MIN_NORMAL);
}
assertEquals(0.0f, dataColumn.vector[9], Float.MIN_NORMAL);
}
@Test
public void test_putRowToBatch_list_doubles() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("array").type().array().items().doubleType().noDefault();
Schema testSchema = builder.endRecord();
GenericData.Record row = new GenericData.Record(testSchema);
List<Double> data1 = Arrays.asList(1.0, 2.0, 3.0);
row.put("array", data1);
TypeDescription orcSchema = OrcUtils.getOrcField(testSchema.getField("array").schema());
VectorizedRowBatch batch = orcSchema.createRowBatch();
batch.ensureSize(2);
MutableInt vectorOffset = new MutableInt(0);
OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 0, testSchema.getField("array").schema(), row.get("array"));
List<Double> data2 = Arrays.asList(40.0, 41.0, 42.0, 43.0);
row.put("array", data2);
OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 1, testSchema.getField("array").schema(), row.get("array"));
ListColumnVector array = ((ListColumnVector) batch.cols[0]);
DoubleColumnVector dataColumn = ((DoubleColumnVector) array.child);
// Check the first row, entries 0..4 should have values 1..5
for (int i = 0; i < 3; i++) {
assertEquals(i + 1.0f, dataColumn.vector[i], Float.MIN_NORMAL);
}
// Check the second row, entries 5..8 should have values 10..40 (by tens)
for (int i = 0; i < 4; i++) {
assertEquals((i + 40.0), dataColumn.vector[(int) array.offsets[1] + i], Float.MIN_NORMAL);
}
assertEquals(0.0, dataColumn.vector[9], Float.MIN_NORMAL);
}
@Test
public void test_putRowToBatch_array_of_maps() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("array").type().array().items().map().values().floatType().noDefault();
Schema testSchema = builder.endRecord();
Map<String, Float> map1 = new TreeMap<String, Float>() {{
put("key10", 10.0f);
put("key20", 20.0f);
}};
Map<String, Float> map2 = new TreeMap<String, Float>() {{
put("key101", 101.0f);
put("key202", 202.0f);
}};
Map[] maps = new Map[]{map1, map2, null};
GenericData.Record row = new GenericData.Record(testSchema);
row.put("array", maps);
TypeDescription orcSchema = OrcUtils.getOrcField(testSchema.getField("array").schema());
VectorizedRowBatch batch = orcSchema.createRowBatch();
OrcUtils.putToRowBatch(batch.cols[0], new MutableInt(0), 0, testSchema.getField("array").schema(), row.get("array"));
ListColumnVector array = ((ListColumnVector) batch.cols[0]);
MapColumnVector map = ((MapColumnVector) array.child);
StringBuilder buffer = new StringBuilder();
map.stringifyValue(buffer, 0);
assertEquals("[{\"key\": \"key10\", \"value\": 10.0}, {\"key\": \"key20\", \"value\": 20.0}]", buffer.toString());
buffer = new StringBuilder();
map.stringifyValue(buffer, 1);
assertEquals("[{\"key\": \"key101\", \"value\": 101.0}, {\"key\": \"key202\", \"value\": 202.0}]", buffer.toString());
}
@Test
public void test_putRowToBatch_primitive_map() throws Exception {
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("testRecord").namespace("any.data").fields();
builder.name("map").type().map().values().longType().noDefault();
Schema testSchema = builder.endRecord();
Map<String, Long> mapData1 = new TreeMap<String, Long>() {{
put("key10", 100L);
put("key20", 200L);
}};
GenericData.Record row = new GenericData.Record(testSchema);
row.put("map", mapData1);
TypeDescription orcSchema = OrcUtils.getOrcField(testSchema.getField("map").schema());
VectorizedRowBatch batch = orcSchema.createRowBatch();
batch.ensureSize(2);
MutableInt vectorOffset = new MutableInt(0);
OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 0, testSchema.getField("map").schema(), row.get("map"));
Map<String, Long> mapData2 = new TreeMap<String, Long>() {{
put("key1000", 1000L);
put("key2000", 2000L);
}};
OrcUtils.putToRowBatch(batch.cols[0], vectorOffset, 1, testSchema.getField("map").schema(), mapData2);
MapColumnVector map = ((MapColumnVector) batch.cols[0]);
StringBuilder buffer = new StringBuilder();
map.stringifyValue(buffer, 0);
assertEquals("[{\"key\": \"key10\", \"value\": 100}, {\"key\": \"key20\", \"value\": 200}]", buffer.toString());
buffer = new StringBuilder();
map.stringifyValue(buffer, 1);
assertEquals("[{\"key\": \"key1000\", \"value\": 1000}, {\"key\": \"key2000\", \"value\": 2000}]", buffer.toString());
}
@Test
public void test_getHiveTypeFromAvroType_primitive() throws Exception {
// Expected ORC types
String[] expectedTypes = {
"INT",
"BIGINT",
"BOOLEAN",
"FLOAT",
"DOUBLE",
"BINARY",
"STRING",
};
Schema testSchema = buildPrimitiveAvroSchema();
List<Schema.Field> fields = testSchema.getFields();
for (int i = 0; i < fields.size(); i++) {
assertEquals(expectedTypes[i], OrcUtils.getHiveTypeFromAvroType(fields.get(i).schema()));
}
}
@Test
public void test_getHiveTypeFromAvroType_complex() throws Exception {
// Expected ORC types
String[] expectedTypes = {
"INT",
"MAP<STRING, DOUBLE>",
"STRING",
"UNIONTYPE<BIGINT, FLOAT>",
"ARRAY<INT>"
};
Schema testSchema = buildComplexAvroSchema();
List<Schema.Field> fields = testSchema.getFields();
for (int i = 0; i < fields.size(); i++) {
assertEquals(expectedTypes[i], OrcUtils.getHiveTypeFromAvroType(fields.get(i).schema()));
}
assertEquals("STRUCT<myInt:INT, myMap:MAP<STRING, DOUBLE>, myEnum:STRING, myLongOrFloat:UNIONTYPE<BIGINT, FLOAT>, myIntList:ARRAY<INT>>",
OrcUtils.getHiveTypeFromAvroType(testSchema));
}
@Test
public void test_generateHiveDDL_primitive() throws Exception {
Schema avroSchema = buildPrimitiveAvroSchema();
String ddl = OrcUtils.generateHiveDDL(avroSchema, "myHiveTable");
assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS myHiveTable (int INT, long BIGINT, boolean BOOLEAN, float FLOAT, double DOUBLE, bytes BINARY, string STRING)"
+ " STORED AS ORC", ddl);
}
@Test
public void test_generateHiveDDL_complex() throws Exception {
Schema avroSchema = buildComplexAvroSchema();
String ddl = OrcUtils.generateHiveDDL(avroSchema, "myHiveTable");
assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS myHiveTable "
+ "(myInt INT, myMap MAP<STRING, DOUBLE>, myEnum STRING, myLongOrFloat UNIONTYPE<BIGINT, FLOAT>, myIntList ARRAY<INT>)"
+ " STORED AS ORC", ddl);
}
//////////////////
// Helper methods
//////////////////
public static Schema buildPrimitiveAvroSchema() {
// Build a fake Avro record with all primitive types
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("test.record").namespace("any.data").fields();
builder.name("int").type().intType().noDefault();
builder.name("long").type().longType().longDefault(1L);
builder.name("boolean").type().booleanType().booleanDefault(true);
builder.name("float").type().floatType().floatDefault(0.0f);
builder.name("double").type().doubleType().doubleDefault(0.0);
builder.name("bytes").type().bytesType().noDefault();
builder.name("string").type().stringType().stringDefault("default");
return builder.endRecord();
}
public static GenericData.Record buildPrimitiveAvroRecord(int i, long l, boolean b, float f, double d, ByteBuffer bytes, String string) {
Schema schema = buildPrimitiveAvroSchema();
GenericData.Record row = new GenericData.Record(schema);
row.put("int", i);
row.put("long", l);
row.put("boolean", b);
row.put("float", f);
row.put("double", d);
row.put("bytes", bytes);
row.put("string", string);
return row;
}
public static TypeDescription buildPrimitiveOrcSchema() {
return TypeDescription.createStruct()
.addField("int", TypeDescription.createInt())
.addField("long", TypeDescription.createLong())
.addField("boolean", TypeDescription.createBoolean())
.addField("float", TypeDescription.createFloat())
.addField("double", TypeDescription.createDouble())
.addField("bytes", TypeDescription.createBinary())
.addField("string", TypeDescription.createString());
}
public static Schema buildComplexAvroSchema() {
// Build a fake Avro record with nested types
final SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.record("complex.record").namespace("any.data").fields();
builder.name("myInt").type().unionOf().nullType().and().intType().endUnion().nullDefault();
builder.name("myMap").type().map().values().doubleType().noDefault();
builder.name("myEnum").type().enumeration("myEnum").symbols("ABC", "DEF", "XYZ").enumDefault("ABC");
builder.name("myLongOrFloat").type().unionOf().longType().and().floatType().endUnion().noDefault();
builder.name("myIntList").type().array().items().intType().noDefault();
return builder.endRecord();
}
public static GenericData.Record buildComplexAvroRecord(Integer i, Map<String, Double> m, String e, Object unionVal, List<Integer> intArray) {
Schema schema = buildComplexAvroSchema();
GenericData.Record row = new GenericData.Record(schema);
row.put("myInt", i);
row.put("myMap", m);
row.put("myEnum", e);
row.put("myLongOrFloat", unionVal);
row.put("myIntList", intArray);
return row;
}
}