mirror of
synced 2025-02-22 10:29:24 +00:00
NIFI-7989: Add UpdateHiveTable processors for data drift capability
NIFI-7989: Allow for optional blank line after optional column and partition headers NIFI-7989: Incorporated review comments NIFI-7989: Close Statement when finishing processing NIFI-7989: Remove database name property, update output table attribute This closes #4653. Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
@ -101,6 +101,11 @@
@ -117,5 +122,11 @@
@ -43,6 +43,13 @@ import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import java.io.IOException;
import java.io.OutputStream;
@ -418,6 +425,86 @@ public class NiFiOrcUtils {
throw new IllegalArgumentException("Error converting Avro type " + avroType.getName() + " to Hive type");
public static String getHiveTypeFromFieldType(DataType rawDataType, boolean hiveFieldNames) {
if (rawDataType == null) {
throw new IllegalArgumentException("Field type is null");
RecordFieldType dataType = rawDataType.getFieldType();
if (RecordFieldType.INT.equals(dataType)) {
return "INT";
if (RecordFieldType.LONG.equals(dataType)) {
return "BIGINT";
if (RecordFieldType.BOOLEAN.equals(dataType)) {
return "BOOLEAN";
if (RecordFieldType.DOUBLE.equals(dataType)) {
return "DOUBLE";
if (RecordFieldType.FLOAT.equals(dataType)) {
return "FLOAT";
if (RecordFieldType.DECIMAL.equals(dataType)) {
return "DECIMAL";
if (RecordFieldType.STRING.equals(dataType) || RecordFieldType.ENUM.equals(dataType)) {
return "STRING";
if (RecordFieldType.DATE.equals(dataType)) {
return "DATE";
if (RecordFieldType.TIME.equals(dataType)) {
return "INT";
if (RecordFieldType.TIMESTAMP.equals(dataType)) {
return "TIMESTAMP";
if (RecordFieldType.ARRAY.equals(dataType)) {
ArrayDataType arrayDataType = (ArrayDataType) rawDataType;
if (RecordFieldType.BYTE.getDataType().equals(arrayDataType.getElementType())) {
return "BINARY";
return "ARRAY<" + getHiveTypeFromFieldType(arrayDataType.getElementType(), hiveFieldNames) + ">";
if (RecordFieldType.MAP.equals(dataType)) {
MapDataType mapDataType = (MapDataType) rawDataType;
return "MAP<STRING, " + getHiveTypeFromFieldType(mapDataType.getValueType(), hiveFieldNames) + ">";
if (RecordFieldType.CHOICE.equals(dataType)) {
ChoiceDataType choiceDataType = (ChoiceDataType) rawDataType;
List<DataType> unionFieldSchemas = choiceDataType.getPossibleSubTypes();
if (unionFieldSchemas != null) {
// Ignore null types in union
List<String> hiveFields = unionFieldSchemas.stream()
.map((it) -> getHiveTypeFromFieldType(it, hiveFieldNames))
// Flatten the field if the union only has one non-null element
return (hiveFields.size() == 1)
? hiveFields.get(0)
: "UNIONTYPE<" + StringUtils.join(hiveFields, ", ") + ">";
return null;
if (RecordFieldType.RECORD.equals(dataType)) {
RecordDataType recordDataType = (RecordDataType) rawDataType;
List<RecordField> recordFields = recordDataType.getChildSchema().getFields();
if (recordFields != null) {
List<String> hiveFields = recordFields.stream().map(
recordField -> ("`" + (hiveFieldNames ? recordField.getFieldName().toLowerCase() : recordField.getFieldName()) + "`:"
+ getHiveTypeFromFieldType(recordField.getDataType(), hiveFieldNames))).collect(Collectors.toList());
return "STRUCT<" + StringUtils.join(hiveFields, ", ") + ">";
return null;
throw new IllegalArgumentException("Error converting Avro type " + dataType.name() + " to Hive type");
public static OrcFlowFileWriter createWriter(OutputStream flowFileOutputStream,
Path path,
@ -0,0 +1,481 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.hive;
import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.dbcp.hive.HiveDBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@Tags({"hive", "metadata", "jdbc", "database", "table"})
@CapabilityDescription("This processor uses a Hive JDBC connection and incoming records to generate any Hive 1.2 table changes needed to support the incoming records.")
@WritesAttribute(attribute = "output.table", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the target table name."),
@WritesAttribute(attribute = "output.path", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the path on the file system to the table (or partition location if the table is partitioned).")
public class UpdateHiveTable extends AbstractProcessor {
static final String TEXTFILE = "TEXTFILE";
static final String SEQUENCEFILE = "SEQUENCEFILE";
static final String ORC = "ORC";
static final String PARQUET = "PARQUET";
static final String AVRO = "AVRO";
static final String RCFILE = "RCFILE";
static final AllowableValue TEXTFILE_STORAGE = new AllowableValue(TEXTFILE, TEXTFILE, "Stored as plain text files. TEXTFILE is the default file format, unless the configuration "
+ "parameter hive.default.fileformat has a different setting.");
static final AllowableValue SEQUENCEFILE_STORAGE = new AllowableValue(SEQUENCEFILE, SEQUENCEFILE, "Stored as compressed Sequence Files.");
static final AllowableValue ORC_STORAGE = new AllowableValue(ORC, ORC, "Stored as ORC file format. Supports ACID Transactions & Cost-based Optimizer (CBO). "
+ "Stores column-level metadata.");
static final AllowableValue PARQUET_STORAGE = new AllowableValue(PARQUET, PARQUET, "Stored as Parquet format for the Parquet columnar storage format.");
static final AllowableValue AVRO_STORAGE = new AllowableValue(AVRO, AVRO, "Stored as Avro format.");
static final AllowableValue RCFILE_STORAGE = new AllowableValue(RCFILE, RCFILE, "Stored as Record Columnar File format.");
static final AllowableValue CREATE_IF_NOT_EXISTS = new AllowableValue("Create If Not Exists", "Create If Not Exists",
"Create a table with the given schema if it does not already exist");
static final AllowableValue FAIL_IF_NOT_EXISTS = new AllowableValue("Fail If Not Exists", "Fail If Not Exists",
"If the target does not already exist, log an error and route the flowfile to failure");
static final String ATTR_OUTPUT_TABLE = "output.table";
static final String ATTR_OUTPUT_PATH = "output.path";
// Properties
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.displayName("Record Reader")
.description("The service for reading incoming flow files. The reader is only used to determine the schema of the records, the actual records will not be processed.")
static final PropertyDescriptor HIVE_DBCP_SERVICE = new PropertyDescriptor.Builder()
.displayName("Hive Database Connection Pooling Service")
.description("The Hive Controller Service that is used to obtain connection(s) to the Hive database")
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.displayName("Table Name")
.description("The name of the database table to update. If the table does not exist, then it will either be created or an error thrown, depending "
+ "on the value of the Create Table property.")
static final PropertyDescriptor CREATE_TABLE = new PropertyDescriptor.Builder()
.displayName("Create Table Strategy")
.description("Specifies how to process the target table when it does not exist (create it, fail, e.g.).")
static final PropertyDescriptor TABLE_STORAGE_FORMAT = new PropertyDescriptor.Builder()
.displayName("Create Table Storage Format")
.description("If a table is to be created, the specified storage format will be used.")
static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
.displayName("Query timeout")
.description("Sets the number of seconds the driver will wait for a query to execute. "
+ "A value of 0 means no timeout. NOTE: Non-zero values may not be supported by the driver.")
static final PropertyDescriptor STATIC_PARTITION_VALUES = new PropertyDescriptor.Builder()
.displayName("Static Partition Values")
.description("Specifies a comma-separated list of the values for the partition columns of the target table. This assumes all incoming records belong to the same partition "
+ "and the partition columns are not fields in the record. If specified, this property will often contain "
+ "Expression Language. For example if PartitionRecord is upstream and two partition columns 'name' and 'age' are used, then this property can be set to "
+ "${name},${age}. This property must be set if the table is partitioned, and must not be set if the table is not partitioned. If this property is set, the values "
+ "will be used as the partition values, and the partition.location value will reflect the location of the partition in the filesystem (for use downstream in "
+ "processors like PutHDFS).")
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("A FlowFile containing records routed to this relationship after the record has been successfully transmitted to Hive.")
public static final Relationship REL_FAILURE = new Relationship.Builder()
.description("A FlowFile containing records routed to this relationship if the record could not be transmitted to Hive.")
private List<PropertyDescriptor> propertyDescriptors;
private Set<Relationship> relationships;
protected void init(ProcessorInitializationContext context) {
List<PropertyDescriptor> props = new ArrayList<>();
propertyDescriptors = Collections.unmodifiableList(props);
Set<Relationship> _relationships = new HashSet<>();
relationships = Collections.unmodifiableSet(_relationships);
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
public Set<Relationship> getRelationships() {
return relationships;
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String staticPartitionValuesString = context.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
List<String> staticPartitionValues = null;
if (!StringUtils.isEmpty(staticPartitionValuesString)) {
staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
final ComponentLog log = getLogger();
try {
final RecordReader reader;
try (final InputStream in = session.read(flowFile)) {
// if we fail to create the RecordReader then we want to route to failure, so we need to
// handle this separately from the other IOExceptions which normally route to retry
try {
reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
} catch (Exception e) {
throw new RecordReaderFactoryException("Unable to create RecordReader", e);
} catch (RecordReaderFactoryException rrfe) {
"Failed to create {} for {} - routing to failure",
new Object[]{RecordReader.class.getSimpleName(), flowFile},
session.transfer(flowFile, REL_FAILURE);
RecordSchema recordSchema = reader.getSchema();
final boolean createIfNotExists = context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
final String storageFormat = context.getProperty(TABLE_STORAGE_FORMAT).getValue();
final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
try (final Connection connection = dbcpService.getConnection()) {
checkAndUpdateTableSchema(session, flowFile, connection, recordSchema, tableName, staticPartitionValues, createIfNotExists, storageFormat);
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, dbcpService.getConnectionURL());
session.transfer(flowFile, REL_SUCCESS);
} catch (IOException | SQLException e) {
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
"Exception while processing {} - routing to failure",
new Object[]{flowFile},
session.transfer(flowFile, REL_FAILURE);
} catch (DiscontinuedException e) {
// The input FlowFile processing is discontinued. Keep it in the input queue.
getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
session.transfer(flowFile, Relationship.SELF);
} catch (Throwable t) {
throw (t instanceof ProcessException) ? (ProcessException) t : new ProcessException(t);
private synchronized void checkAndUpdateTableSchema(final ProcessSession session, final FlowFile flowFile, final Connection conn, final RecordSchema schema,
final String tableName, final List<String> partitionValues,
final boolean createIfNotExists, final String storageFormat) throws IOException {
// Read in the current table metadata, compare it to the reader's schema, and
// add any columns from the schema that are missing in the table
try (Statement s = conn.createStatement()) {
// Determine whether the table exists
ResultSet tables = s.executeQuery("SHOW TABLES");
List<String> tableNames = new ArrayList<>();
String hiveTableName;
while (tables.next() && StringUtils.isNotEmpty(hiveTableName = tables.getString(1))) {
List<String> columnsToAdd = new ArrayList<>();
String outputPath;
if (!tableNames.contains(tableName) && createIfNotExists) {
StringBuilder createTableStatement = new StringBuilder();
for (RecordField recordField : schema.getFields()) {
String recordFieldName = recordField.getFieldName();
// The field does not exist in the table, add it
columnsToAdd.add(recordFieldName + " " + NiFiOrcUtils.getHiveTypeFromFieldType(recordField.getDataType(), true));
getLogger().debug("Adding column " + recordFieldName + " to table " + tableName);
createTableStatement.append("CREATE TABLE IF NOT EXISTS ")
.append(" (")
.append(String.join(", ", columnsToAdd))
.append(") STORED AS ")
String createTableSql = createTableStatement.toString();
if (StringUtils.isNotEmpty(createTableSql)) {
// Perform the table create
getLogger().info("Executing Hive DDL: " + createTableSql);
// Now that the table is created, describe it and determine its location (for placing the flowfile downstream)
String describeTable = "DESC FORMATTED " + tableName;
ResultSet tableInfo = s.executeQuery(describeTable);
boolean moreRows = tableInfo.next();
boolean locationFound = false;
while (moreRows && !locationFound) {
String line = tableInfo.getString(1);
if (line.startsWith("Location:")) {
locationFound = true;
continue; // Don't do a next() here, need to get the second column value
moreRows = tableInfo.next();
outputPath = tableInfo.getString(2);
} else {
List<String> hiveColumns = new ArrayList<>();
String describeTable = "DESC FORMATTED " + tableName;
ResultSet tableInfo = s.executeQuery(describeTable);
// Result is 3 columns, col_name, data_type, comment. Check the first row for a header and skip if so, otherwise add column name
String columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
// If the column was a header, check for a blank line to follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) {
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName)) {
// Collect all column names
while (tableInfo.next() && StringUtils.isNotEmpty(columnName = tableInfo.getString(1))) {
// Collect all partition columns
boolean moreRows = true;
boolean headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if ("# Partition Information".equals(line)) {
headerFound = true;
} else if ("# Detailed Table Information".equals(line)) {
// Not partitioned, exit the loop with headerFound = false
moreRows = tableInfo.next();
List<String> partitionColumns = new ArrayList<>();
List<String> partitionColumnsEqualsValueList = new ArrayList<>();
List<String> partitionColumnsLocationList = new ArrayList<>();
if (headerFound) {
// If the table is partitioned, construct the partition=value strings for each partition column
String partitionColumnName;
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
// If the column was a header, check for a blank line to follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) {
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName)) {
while (tableInfo.next() && StringUtils.isNotEmpty(partitionColumnName = tableInfo.getString(1))) {
final int partitionColumnsSize = partitionColumns.size();
if (partitionValues == null) {
throw new IOException("Found " + partitionColumnsSize + " partition columns but no Static Partition Values were supplied");
final int partitionValuesSize = partitionValues.size();
if (partitionValuesSize < partitionColumnsSize) {
throw new IOException("Found " + partitionColumnsSize + " partition columns but only " + partitionValuesSize + " Static Partition Values were supplied");
for (int i = 0; i < partitionColumns.size(); i++) {
partitionColumnsEqualsValueList.add(partitionColumns.get(i) + "='" + partitionValues.get(i) + "'");
// Add unquoted version for the output path
partitionColumnsLocationList.add(partitionColumns.get(i) + "=" + partitionValues.get(i));
// Get table location
moreRows = true;
headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if (line.startsWith("Location:")) {
headerFound = true;
continue; // Don't do a next() here, need to get the second column value
moreRows = tableInfo.next();
String tableLocation = tableInfo.getString(2);
StringBuilder alterTableStatement = new StringBuilder();
// Handle new columns
for (RecordField recordField : schema.getFields()) {
String recordFieldName = recordField.getFieldName().toLowerCase();
if (!hiveColumns.contains(recordFieldName) && !partitionColumns.contains(recordFieldName)) {
// The field does not exist in the table (and is not a partition column), add it
columnsToAdd.add(recordFieldName + " " + NiFiOrcUtils.getHiveTypeFromFieldType(recordField.getDataType(), true));
getLogger().info("Adding column " + recordFieldName + " to table " + tableName);
String alterTableSql;
if (!columnsToAdd.isEmpty()) {
alterTableStatement.append("ALTER TABLE ")
.append(" ADD COLUMNS (")
.append(String.join(", ", columnsToAdd))
alterTableSql = alterTableStatement.toString();
if (StringUtils.isNotEmpty(alterTableSql)) {
// Perform the table update
getLogger().info("Executing Hive DDL: " + alterTableSql);
outputPath = tableLocation;
// Handle new partitions
if (!partitionColumnsEqualsValueList.isEmpty()) {
alterTableSql = "ALTER TABLE " +
tableName +
String.join(", ", partitionColumnsEqualsValueList) +
if (StringUtils.isNotEmpty(alterTableSql)) {
// Perform the table update
getLogger().info("Executing Hive DDL: " + alterTableSql);
// Add attribute for HDFS location of the partition values
outputPath = tableLocation + "/" + String.join("/", partitionColumnsLocationList);
session.putAttribute(flowFile, ATTR_OUTPUT_PATH, outputPath);
} catch (Exception e) {
throw new IOException(e);
@ -16,3 +16,4 @@ org.apache.nifi.processors.hive.ConvertAvroToORC
@ -0,0 +1,376 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.hive;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.dbcp.hive.HiveDBCPService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestUpdateHiveTable {
private static final String TEST_CONF_PATH = "src/test/resources/core-site.xml";
private static final String TARGET_HIVE = "target/hive";
private static final String[] SHOW_TABLES_COLUMN_NAMES = new String[]{"tab_name"};
private static final String[][] SHOW_TABLES_RESULTSET = new String[][]{
new String[]{"messages"},
new String[]{"users"},
private static final String[] DESC_MESSAGES_TABLE_COLUMN_NAMES = new String[]{"id", "msg"};
private static final String[][] DESC_MESSAGES_TABLE_RESULTSET = new String[][]{
new String[]{"# col_name", "data_type", "comment"},
new String[]{"", null, null},
new String[]{"id", "int", ""},
new String[]{"msg", "string", ""},
new String[]{"", null, null},
new String[]{"# Partition Information", null, null},
new String[]{"# col_name", "data_type", "comment"},
new String[]{"", null, null},
new String[]{"continent", "string", ""},
new String[]{"country", "string", ""},
new String[]{"", null, null},
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/messages", null}
private static final String[] DESC_USERS_TABLE_COLUMN_NAMES = new String[]{"name", "favorite_number", "favorite_color", "scale"};
private static final String[][] DESC_USERS_TABLE_RESULTSET = new String[][]{
new String[]{"name", "string", ""},
new String[]{"favorite_number", "int", ""},
new String[]{"favorite_color", "string", ""},
new String[]{"scale", "double", ""},
new String[]{"", null, null},
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/users", null}
private static final String[][] DESC_NEW_TABLE_RESULTSET = new String[][]{
new String[]{"# col_name", "data_type", "comment"},
new String[]{"name", "string", ""},
new String[]{"favorite_number", "int", ""},
new String[]{"favorite_color", "string", ""},
new String[]{"scale", "double", ""},
new String[]{"", null, null},
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/newTable", null}
public TemporaryFolder folder = new TemporaryFolder();
private TestRunner runner;
private MockUpdateHiveTable processor;
public void setUp() {
Configuration testConf = new Configuration();
testConf.addResource(new Path(TEST_CONF_PATH));
// Delete any temp files from previous tests
try {
FileUtils.deleteDirectory(new File(TARGET_HIVE));
} catch (IOException ioe) {
// Do nothing, directory may not have existed
processor = new MockUpdateHiveTable();
private void configure(final UpdateHiveTable processor, final int numUsers) throws InitializationException {
configure(processor, numUsers, false, -1);
private void configure(final UpdateHiveTable processor, final int numUsers, boolean failOnCreateReader, int failAfter) throws InitializationException {
configure(processor, numUsers, failOnCreateReader, failAfter, null);
private void configure(final UpdateHiveTable processor, final int numUsers, final boolean failOnCreateReader, final int failAfter,
final BiFunction<Integer, MockRecordParser, Void> recordGenerator) throws InitializationException {
runner = TestRunners.newTestRunner(processor);
MockRecordParser readerFactory = new MockRecordParser() {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
if (failOnCreateReader) {
throw new SchemaNotFoundException("test");
return super.createRecordReader(variables, in, inputLength, logger);
List<RecordField> fields = Arrays.asList(
new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("favorite_number", RecordFieldType.INT.getDataType()),
new RecordField("favorite_color", RecordFieldType.STRING.getDataType()),
new RecordField("scale", RecordFieldType.DOUBLE.getDataType())
final SimpleRecordSchema recordSchema = new SimpleRecordSchema(fields);
for (final RecordField recordField : recordSchema.getFields()) {
readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
if (recordGenerator == null) {
for (int i = 0; i < numUsers; i++) {
readerFactory.addRecord("name" + i, i, "blue" + i, i * 10.0);
} else {
recordGenerator.apply(numUsers, readerFactory);
runner.addControllerService("mock-reader-factory", readerFactory);
runner.setProperty(UpdateHiveTable.RECORD_READER, "mock-reader-factory");
public void testSetup() throws Exception {
configure(processor, 0);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.setProperty(UpdateHiveTable.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHiveTable.TABLE_NAME, "users");
public void testNoStatementsExecuted() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHiveTable.TABLE_NAME, "users");
final MockDBCPService service = new MockDBCPService("test");
runner.addControllerService("dbcp", service);
runner.setProperty(UpdateHiveTable.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHiveTable.STATIC_PARTITION_VALUES, "Asia,China");
runner.enqueue(new byte[0]);
runner.assertTransferCount(UpdateHiveTable.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHiveTable.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHiveTable.ATTR_OUTPUT_TABLE, "users");
flowFile.assertAttributeEquals(UpdateHiveTable.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/users");
public void testCreateTable() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHiveTable.TABLE_NAME, "${table.name}");
runner.setProperty(UpdateHiveTable.CREATE_TABLE, UpdateHiveTable.CREATE_IF_NOT_EXISTS);
runner.setProperty(UpdateHiveTable.TABLE_STORAGE_FORMAT, UpdateHiveTable.PARQUET);
final MockDBCPService service = new MockDBCPService("newTable");
runner.addControllerService("dbcp", service);
runner.setProperty(UpdateHiveTable.HIVE_DBCP_SERVICE, "dbcp");
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "newTable");
runner.enqueue(new byte[0], attrs);
runner.assertTransferCount(UpdateHiveTable.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHiveTable.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHiveTable.ATTR_OUTPUT_TABLE, "newTable");
flowFile.assertAttributeEquals(UpdateHiveTable.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/newTable");
List<String> statements = service.getExecutedStatements();
assertEquals(1, statements.size());
assertEquals("CREATE TABLE IF NOT EXISTS newTable (name STRING, favorite_number INT, favorite_color STRING, scale DOUBLE) STORED AS PARQUET",
public void testAddColumnsAndPartition() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHiveTable.TABLE_NAME, "messages");
final MockDBCPService service = new MockDBCPService("test");
runner.addControllerService("dbcp", service);
runner.setProperty(UpdateHiveTable.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHiveTable.STATIC_PARTITION_VALUES, "Asia,China");
runner.enqueue(new byte[0]);
runner.assertTransferCount(UpdateHiveTable.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHiveTable.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHiveTable.ATTR_OUTPUT_TABLE, "messages");
flowFile.assertAttributeEquals(UpdateHiveTable.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/messages/continent=Asia/country=China");
List<String> statements = service.getExecutedStatements();
assertEquals(2, statements.size());
// All columns from users table/data should be added to the table, and a new partition should be added
assertEquals("ALTER TABLE messages ADD COLUMNS (name STRING, favorite_number INT, favorite_color STRING, scale DOUBLE)",
assertEquals("ALTER TABLE messages ADD IF NOT EXISTS PARTITION (continent='Asia', country='China')",
public void testMissingPartitionValues() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHiveTable.TABLE_NAME, "messages");
final DBCPService service = new MockDBCPService("test");
runner.addControllerService("dbcp", service);
runner.setProperty(UpdateHiveTable.HIVE_DBCP_SERVICE, "dbcp");
runner.enqueue(new byte[0]);
runner.assertTransferCount(UpdateHiveTable.REL_SUCCESS, 0);
runner.assertTransferCount(UpdateHiveTable.REL_FAILURE, 1);
private static final class MockUpdateHiveTable extends UpdateHiveTable {
* Simple implementation only for testing purposes
private static class MockDBCPService extends AbstractControllerService implements HiveDBCPService {
private final String dbLocation;
private final List<String> executedStatements = new ArrayList<>();
MockDBCPService(final String dbLocation) {
this.dbLocation = dbLocation;
public String getIdentifier() {
return "dbcp";
public Connection getConnection() throws ProcessException {
try {
Connection conn = mock(Connection.class);
Statement s = mock(Statement.class);
when(s.executeQuery(anyString())).thenAnswer((Answer<ResultSet>) invocation -> {
final String query = invocation.getArgument(0);
if ("SHOW TABLES".equals(query)) {
return new MockResultSet(SHOW_TABLES_COLUMN_NAMES, SHOW_TABLES_RESULTSET).createResultSet();
} else if ("DESC FORMATTED messages".equals(query)) {
} else if ("DESC FORMATTED users".equals(query)) {
} else if ("DESC FORMATTED newTable".equals(query)) {
return new MockResultSet(DESC_NEW_TABLE_COLUMN_NAMES, DESC_NEW_TABLE_RESULTSET).createResultSet();
} else {
return new MockResultSet(new String[]{}, new String[][]{new String[]{}}).createResultSet();
when(s.execute(anyString())).thenAnswer((Answer<Boolean>) invocation -> {
return false;
return conn;
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
public String getConnectionURL() {
return "jdbc:fake:" + dbLocation;
List<String> getExecutedStatements() {
return executedStatements;
private static class MockResultSet {
String[] colNames;
String[][] data;
int currentRow;
MockResultSet(String[] colNames, String[][] data) {
this.colNames = colNames;
this.data = data;
currentRow = 0;
ResultSet createResultSet() throws SQLException {
ResultSet rs = mock(ResultSet.class);
when(rs.next()).thenAnswer((Answer<Boolean>) invocation -> (data != null) && (++currentRow <= data.length));
when(rs.getString(anyInt())).thenAnswer((Answer<String>) invocation -> {
final int index = invocation.getArgument(0);
if (index < 1) {
throw new SQLException("Columns start with index 1");
if (currentRow > data.length) {
throw new SQLException("This result set is already closed");
return data[currentRow - 1][index - 1];
return rs;
@ -0,0 +1,482 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.hive;
import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.dbcp.hive.Hive3DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@Tags({"hive", "metadata", "jdbc", "database", "table"})
@CapabilityDescription("This processor uses a Hive JDBC connection and incoming records to generate any Hive 3.0+ table changes needed to support the incoming records.")
@WritesAttribute(attribute = "output.table", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the target table name."),
@WritesAttribute(attribute = "output.path", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the path on the file system to the table (or partition location if the table is partitioned).")
public class UpdateHive3Table extends AbstractProcessor {
static final String TEXTFILE = "TEXTFILE";
static final String SEQUENCEFILE = "SEQUENCEFILE";
static final String ORC = "ORC";
static final String PARQUET = "PARQUET";
static final String AVRO = "AVRO";
static final String RCFILE = "RCFILE";
static final AllowableValue TEXTFILE_STORAGE = new AllowableValue(TEXTFILE, TEXTFILE, "Stored as plain text files. TEXTFILE is the default file format, unless the configuration "
+ "parameter hive.default.fileformat has a different setting.");
static final AllowableValue SEQUENCEFILE_STORAGE = new AllowableValue(SEQUENCEFILE, SEQUENCEFILE, "Stored as compressed Sequence Files.");
static final AllowableValue ORC_STORAGE = new AllowableValue(ORC, ORC, "Stored as ORC file format. Supports ACID Transactions & Cost-based Optimizer (CBO). "
+ "Stores column-level metadata.");
static final AllowableValue PARQUET_STORAGE = new AllowableValue(PARQUET, PARQUET, "Stored as Parquet format for the Parquet columnar storage format.");
static final AllowableValue AVRO_STORAGE = new AllowableValue(AVRO, AVRO, "Stored as Avro format.");
static final AllowableValue RCFILE_STORAGE = new AllowableValue(RCFILE, RCFILE, "Stored as Record Columnar File format.");
static final AllowableValue CREATE_IF_NOT_EXISTS = new AllowableValue("Create If Not Exists", "Create If Not Exists",
"Create a table with the given schema if it does not already exist");
static final AllowableValue FAIL_IF_NOT_EXISTS = new AllowableValue("Fail If Not Exists", "Fail If Not Exists",
"If the target does not already exist, log an error and route the flowfile to failure");
static final String ATTR_OUTPUT_TABLE = "output.table";
static final String ATTR_OUTPUT_PATH = "output.path";
// Properties
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.displayName("Record Reader")
.description("The service for reading incoming flow files. The reader is only used to determine the schema of the records, the actual records will not be processed.")
static final PropertyDescriptor HIVE_DBCP_SERVICE = new PropertyDescriptor.Builder()
.displayName("Hive Database Connection Pooling Service")
.description("The Hive Controller Service that is used to obtain connection(s) to the Hive database")
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.displayName("Table Name")
.description("The name of the database table to update. If the table does not exist, then it will either be created or an error thrown, depending "
+ "on the value of the Create Table property.")
static final PropertyDescriptor CREATE_TABLE = new PropertyDescriptor.Builder()
.displayName("Create Table Strategy")
.description("Specifies how to process the target table when it does not exist (create it, fail, e.g.).")
static final PropertyDescriptor TABLE_STORAGE_FORMAT = new PropertyDescriptor.Builder()
.displayName("Create Table Storage Format")
.description("If a table is to be created, the specified storage format will be used.")
static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
.displayName("Query timeout")
.description("Sets the number of seconds the driver will wait for a query to execute. "
+ "A value of 0 means no timeout. NOTE: Non-zero values may not be supported by the driver.")
static final PropertyDescriptor STATIC_PARTITION_VALUES = new PropertyDescriptor.Builder()
.displayName("Static Partition Values")
.description("Specifies a comma-separated list of the values for the partition columns of the target table. This assumes all incoming records belong to the same partition "
+ "and the partition columns are not fields in the record. If specified, this property will often contain "
+ "Expression Language. For example if PartitionRecord is upstream and two partition columns 'name' and 'age' are used, then this property can be set to "
+ "${name},${age}. This property must be set if the table is partitioned, and must not be set if the table is not partitioned. If this property is set, the values "
+ "will be used as the partition values, and the partition.location value will reflect the location of the partition in the filesystem (for use downstream in "
+ "processors like PutHDFS).")
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("A FlowFile containing records routed to this relationship after the record has been successfully transmitted to Hive.")
public static final Relationship REL_FAILURE = new Relationship.Builder()
.description("A FlowFile containing records routed to this relationship if the record could not be transmitted to Hive.")
private List<PropertyDescriptor> propertyDescriptors;
private Set<Relationship> relationships;
protected void init(ProcessorInitializationContext context) {
List<PropertyDescriptor> props = new ArrayList<>();
propertyDescriptors = Collections.unmodifiableList(props);
Set<Relationship> _relationships = new HashSet<>();
relationships = Collections.unmodifiableSet(_relationships);
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
public Set<Relationship> getRelationships() {
return relationships;
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String staticPartitionValuesString = context.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
List<String> staticPartitionValues = null;
if (!StringUtils.isEmpty(staticPartitionValuesString)) {
staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
final ComponentLog log = getLogger();
try {
final RecordReader reader;
try (final InputStream in = session.read(flowFile)) {
// if we fail to create the RecordReader then we want to route to failure, so we need to
// handle this separately from the other IOExceptions which normally route to retry
try {
reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
} catch (Exception e) {
throw new RecordReaderFactoryException("Unable to create RecordReader", e);
} catch (RecordReaderFactoryException rrfe) {
"Failed to create {} for {} - routing to failure",
new Object[]{RecordReader.class.getSimpleName(), flowFile},
session.transfer(flowFile, REL_FAILURE);
RecordSchema recordSchema = reader.getSchema();
final boolean createIfNotExists = context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
final String storageFormat = context.getProperty(TABLE_STORAGE_FORMAT).getValue();
final Hive3DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive3DBCPService.class);
try (final Connection connection = dbcpService.getConnection()) {
checkAndUpdateTableSchema(session, flowFile, connection, recordSchema, tableName, staticPartitionValues, createIfNotExists, storageFormat);
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, dbcpService.getConnectionURL());
session.transfer(flowFile, REL_SUCCESS);
} catch (IOException | SQLException e) {
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
"Exception while processing {} - routing to failure",
new Object[]{flowFile},
session.transfer(flowFile, REL_FAILURE);
} catch (DiscontinuedException e) {
// The input FlowFile processing is discontinued. Keep it in the input queue.
getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
session.transfer(flowFile, Relationship.SELF);
} catch (Throwable t) {
throw (t instanceof ProcessException) ? (ProcessException) t : new ProcessException(t);
private synchronized void checkAndUpdateTableSchema(final ProcessSession session, final FlowFile flowFile, final Connection conn, final RecordSchema schema,
final String tableName, final List<String> partitionValues,
final boolean createIfNotExists, final String storageFormat) throws IOException {
// Read in the current table metadata, compare it to the reader's schema, and
// add any columns from the schema that are missing in the table
try (Statement s = conn.createStatement()) {
// Determine whether the table exists
ResultSet tables = s.executeQuery("SHOW TABLES");
List<String> tableNames = new ArrayList<>();
String hiveTableName;
while (tables.next() && StringUtils.isNotEmpty(hiveTableName = tables.getString(1))) {
List<String> columnsToAdd = new ArrayList<>();
String outputPath;
if (!tableNames.contains(tableName) && createIfNotExists) {
StringBuilder createTableStatement = new StringBuilder();
for (RecordField recordField : schema.getFields()) {
String recordFieldName = recordField.getFieldName();
// The field does not exist in the table, add it
columnsToAdd.add(recordFieldName + " " + NiFiOrcUtils.getHiveTypeFromFieldType(recordField.getDataType(), true));
getLogger().debug("Adding column " + recordFieldName + " to table " + tableName);
createTableStatement.append("CREATE TABLE IF NOT EXISTS ")
.append(" (")
.append(String.join(", ", columnsToAdd))
.append(") STORED AS ")
String createTableSql = createTableStatement.toString();
if (StringUtils.isNotEmpty(createTableSql)) {
// Perform the table create
getLogger().info("Executing Hive DDL: " + createTableSql);
// Now that the table is created, describe it and determine its location (for placing the flowfile downstream)
String describeTable = "DESC FORMATTED " + tableName;
ResultSet tableInfo = s.executeQuery(describeTable);
boolean moreRows = tableInfo.next();
boolean locationFound = false;
while (moreRows && !locationFound) {
String line = tableInfo.getString(1);
if (line.startsWith("Location:")) {
locationFound = true;
continue; // Don't do a next() here, need to get the second column value
moreRows = tableInfo.next();
outputPath = tableInfo.getString(2);
} else {
List<String> hiveColumns = new ArrayList<>();
String describeTable = "DESC FORMATTED " + tableName;
ResultSet tableInfo = s.executeQuery(describeTable);
// Result is 3 columns, col_name, data_type, comment. Check the first row for a header and skip if so, otherwise add column name
String columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
// If the column was a header, check for a blank line to follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) {
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName)) {
// Collect all column names
while (tableInfo.next() && StringUtils.isNotEmpty(columnName = tableInfo.getString(1))) {
// Collect all partition columns
boolean moreRows = true;
boolean headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if ("# Partition Information".equals(line)) {
headerFound = true;
} else if ("# Detailed Table Information".equals(line)) {
// Not partitioned, exit the loop with headerFound = false
moreRows = tableInfo.next();
List<String> partitionColumns = new ArrayList<>();
List<String> partitionColumnsEqualsValueList = new ArrayList<>();
List<String> partitionColumnsLocationList = new ArrayList<>();
if (headerFound) {
// If the table is partitioned, construct the partition=value strings for each partition column
String partitionColumnName;
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
// If the column was a header, check for a blank line to follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) {
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName)) {
while (tableInfo.next() && StringUtils.isNotEmpty(partitionColumnName = tableInfo.getString(1))) {
final int partitionColumnsSize = partitionColumns.size();
if (partitionValues == null) {
throw new IOException("Found " + partitionColumnsSize + " partition columns but no Static Partition Values were supplied");
final int partitionValuesSize = partitionValues.size();
if (partitionValuesSize < partitionColumnsSize) {
throw new IOException("Found " + partitionColumnsSize + " partition columns but only " + partitionValuesSize + " Static Partition Values were supplied");
for (int i = 0; i < partitionColumns.size(); i++) {
partitionColumnsEqualsValueList.add(partitionColumns.get(i) + "='" + partitionValues.get(i) + "'");
// Add unquoted version for the output path
partitionColumnsLocationList.add(partitionColumns.get(i) + "=" + partitionValues.get(i));
// Get table location
moreRows = true;
headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if (line.startsWith("Location:")) {
headerFound = true;
continue; // Don't do a next() here, need to get the second column value
moreRows = tableInfo.next();
String tableLocation = tableInfo.getString(2);
StringBuilder alterTableStatement = new StringBuilder();
// Handle new columns
for (RecordField recordField : schema.getFields()) {
String recordFieldName = recordField.getFieldName().toLowerCase();
if (!hiveColumns.contains(recordFieldName) && !partitionColumns.contains(recordFieldName)) {
// The field does not exist in the table (and is not a partition column), add it
columnsToAdd.add(recordFieldName + " " + NiFiOrcUtils.getHiveTypeFromFieldType(recordField.getDataType(), true));
getLogger().info("Adding column " + recordFieldName + " to table " + tableName);
String alterTableSql;
if (!columnsToAdd.isEmpty()) {
alterTableStatement.append("ALTER TABLE ")
.append(" ADD COLUMNS (")
.append(String.join(", ", columnsToAdd))
alterTableSql = alterTableStatement.toString();
if (StringUtils.isNotEmpty(alterTableSql)) {
// Perform the table update
getLogger().info("Executing Hive DDL: " + alterTableSql);
outputPath = tableLocation;
// Handle new partitions
if (!partitionColumnsEqualsValueList.isEmpty()) {
alterTableSql = "ALTER TABLE " +
tableName +
String.join(", ", partitionColumnsEqualsValueList) +
if (StringUtils.isNotEmpty(alterTableSql)) {
// Perform the table update
getLogger().info("Executing Hive DDL: " + alterTableSql);
// Add attribute for HDFS location of the partition values
outputPath = tableLocation + "/" + String.join("/", partitionColumnsLocationList);
session.putAttribute(flowFile, ATTR_OUTPUT_PATH, outputPath);
} catch (Exception e) {
throw new IOException(e);
@ -16,3 +16,4 @@ org.apache.nifi.processors.hive.SelectHive3QL
@ -0,0 +1,376 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.hive;
import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.dbcp.hive.Hive3DBCPService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestUpdateHive3Table {
private static final String TEST_CONF_PATH = "src/test/resources/core-site.xml";
private static final String TARGET_HIVE = "target/hive";
private static final String[] SHOW_TABLES_COLUMN_NAMES = new String[]{"tab_name"};
private static final String[][] SHOW_TABLES_RESULTSET = new String[][]{
new String[]{"messages"},
new String[]{"users"},
private static final String[] DESC_MESSAGES_TABLE_COLUMN_NAMES = new String[]{"id", "msg"};
private static final String[][] DESC_MESSAGES_TABLE_RESULTSET = new String[][]{
new String[]{"# col_name", "data_type", "comment"},
new String[]{"id", "int", ""},
new String[]{"msg", "string", ""},
new String[]{"", null, null},
new String[]{"# Partition Information", null, null},
new String[]{"# col_name", "data_type", "comment"},
new String[]{"continent", "string", ""},
new String[]{"country", "string", ""},
new String[]{"", null, null},
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/messages", null}
private static final String[] DESC_USERS_TABLE_COLUMN_NAMES = new String[]{"name", "favorite_number", "favorite_color", "scale"};
private static final String[][] DESC_USERS_TABLE_RESULTSET = new String[][]{
new String[]{"name", "string", ""},
new String[]{"favorite_number", "int", ""},
new String[]{"favorite_color", "string", ""},
new String[]{"scale", "double", ""},
new String[]{"", null, null},
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/users", null}
private static final String[][] DESC_NEW_TABLE_RESULTSET = new String[][]{
new String[]{"# col_name", "data_type", "comment"},
new String[]{"name", "string", ""},
new String[]{"favorite_number", "int", ""},
new String[]{"favorite_color", "string", ""},
new String[]{"scale", "double", ""},
new String[]{"", null, null},
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/newTable", null}
public TemporaryFolder folder = new TemporaryFolder();
private TestRunner runner;
private MockUpdateHive3Table processor;
private Schema schema;
public void setUp() throws Exception {
final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/user.avsc"), StandardCharsets.UTF_8);
schema = new Schema.Parser().parse(avroSchema);
Configuration testConf = new Configuration();
testConf.addResource(new Path(TEST_CONF_PATH));
// Delete any temp files from previous tests
try {
FileUtils.deleteDirectory(new File(TARGET_HIVE));
} catch (IOException ioe) {
// Do nothing, directory may not have existed
processor = new MockUpdateHive3Table();
private void configure(final UpdateHive3Table processor, final int numUsers) throws InitializationException {
configure(processor, numUsers, false, -1);
private void configure(final UpdateHive3Table processor, final int numUsers, boolean failOnCreateReader, int failAfter) throws InitializationException {
configure(processor, numUsers, failOnCreateReader, failAfter, null);
private void configure(final UpdateHive3Table processor, final int numUsers, final boolean failOnCreateReader, final int failAfter,
final BiFunction<Integer, MockRecordParser, Void> recordGenerator) throws InitializationException {
runner = TestRunners.newTestRunner(processor);
MockRecordParser readerFactory = new MockRecordParser() {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
if (failOnCreateReader) {
throw new SchemaNotFoundException("test");
return super.createRecordReader(variables, in, inputLength, logger);
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
for (final RecordField recordField : recordSchema.getFields()) {
readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
if (recordGenerator == null) {
for (int i = 0; i < numUsers; i++) {
readerFactory.addRecord("name" + i, i, "blue" + i, i * 10.0);
} else {
recordGenerator.apply(numUsers, readerFactory);
runner.addControllerService("mock-reader-factory", readerFactory);
runner.setProperty(UpdateHive3Table.RECORD_READER, "mock-reader-factory");
public void testSetup() throws Exception {
configure(processor, 0);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHive3Table.TABLE_NAME, "users");
public void testNoStatementsExecuted() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "users");
final MockDBCPService service = new MockDBCPService("test");
runner.addControllerService("dbcp", service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHive3Table.STATIC_PARTITION_VALUES, "Asia,China");
runner.enqueue(new byte[0]);
runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive3Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE, "users");
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/users");
public void testCreateTable() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "${table.name}");
runner.setProperty(UpdateHive3Table.CREATE_TABLE, UpdateHive3Table.CREATE_IF_NOT_EXISTS);
runner.setProperty(UpdateHive3Table.TABLE_STORAGE_FORMAT, UpdateHive3Table.PARQUET);
final MockDBCPService service = new MockDBCPService("newTable");
runner.addControllerService("dbcp", service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "newTable");
runner.enqueue(new byte[0], attrs);
runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive3Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE, "newTable");
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/newTable");
List<String> statements = service.getExecutedStatements();
assertEquals(1, statements.size());
assertEquals("CREATE TABLE IF NOT EXISTS newTable (name STRING, favorite_number INT, favorite_color STRING, scale DOUBLE) STORED AS PARQUET",
public void testAddColumnsAndPartition() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "messages");
final MockDBCPService service = new MockDBCPService("test");
runner.addControllerService("dbcp", service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHive3Table.STATIC_PARTITION_VALUES, "Asia,China");
runner.enqueue(new byte[0]);
runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive3Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE, "messages");
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/messages/continent=Asia/country=China");
List<String> statements = service.getExecutedStatements();
assertEquals(2, statements.size());
// All columns from users table/data should be added to the table, and a new partition should be added
assertEquals("ALTER TABLE messages ADD COLUMNS (name STRING, favorite_number INT, favorite_color STRING, scale DOUBLE)",
assertEquals("ALTER TABLE messages ADD IF NOT EXISTS PARTITION (continent='Asia', country='China')",
public void testMissingPartitionValues() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "messages");
final DBCPService service = new MockDBCPService("test");
runner.addControllerService("dbcp", service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
runner.enqueue(new byte[0]);
runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 0);
runner.assertTransferCount(UpdateHive3Table.REL_FAILURE, 1);
private static final class MockUpdateHive3Table extends UpdateHive3Table {
* Simple implementation only for testing purposes
private static class MockDBCPService extends AbstractControllerService implements Hive3DBCPService {
private final String dbLocation;
private final List<String> executedStatements = new ArrayList<>();
MockDBCPService(final String dbLocation) {
this.dbLocation = dbLocation;
public String getIdentifier() {
return "dbcp";
public Connection getConnection() throws ProcessException {
try {
Connection conn = mock(Connection.class);
Statement s = mock(Statement.class);
when(s.executeQuery(anyString())).thenAnswer((Answer<ResultSet>) invocation -> {
final String query = invocation.getArgument(0);
if ("SHOW TABLES".equals(query)) {
return new MockResultSet(SHOW_TABLES_COLUMN_NAMES, SHOW_TABLES_RESULTSET).createResultSet();
} else if ("DESC FORMATTED messages".equals(query)) {
} else if ("DESC FORMATTED users".equals(query)) {
} else if ("DESC FORMATTED newTable".equals(query)) {
return new MockResultSet(DESC_NEW_TABLE_COLUMN_NAMES, DESC_NEW_TABLE_RESULTSET).createResultSet();
} else {
return new MockResultSet(new String[]{}, new String[][]{new String[]{}}).createResultSet();
when(s.execute(anyString())).thenAnswer((Answer<Boolean>) invocation -> {
return false;
return conn;
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
public String getConnectionURL() {
return "jdbc:fake:" + dbLocation;
List<String> getExecutedStatements() {
return executedStatements;
private static class MockResultSet {
String[] colNames;
String[][] data;
int currentRow;
MockResultSet(String[] colNames, String[][] data) {
this.colNames = colNames;
this.data = data;
currentRow = 0;
ResultSet createResultSet() throws SQLException {
ResultSet rs = mock(ResultSet.class);
when(rs.next()).thenAnswer((Answer<Boolean>) invocation -> (data != null) && (++currentRow <= data.length));
when(rs.getString(anyInt())).thenAnswer((Answer<String>) invocation -> {
final int index = invocation.getArgument(0);
if (index < 1) {
throw new SQLException("Columns start with index 1");
if (currentRow > data.length) {
throw new SQLException("This result set is already closed");
return data[currentRow - 1][index - 1];
return rs;
@ -67,6 +67,10 @@
@ -101,6 +105,11 @@
@ -117,5 +126,11 @@
@ -0,0 +1,567 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.hive;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.dbcp.hive.Hive_1_1DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@Tags({"hive", "metadata", "jdbc", "database", "table"})
@CapabilityDescription("This processor uses a Hive JDBC connection and incoming records to generate any Hive 1.1 table changes needed to support the incoming records.")
@WritesAttribute(attribute = "output.table", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the target table name."),
@WritesAttribute(attribute = "output.path", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the path on the file system to the table (or partition location if the table is partitioned).")
public class UpdateHive_1_1Table extends AbstractProcessor {
static final String TEXTFILE = "TEXTFILE";
static final String SEQUENCEFILE = "SEQUENCEFILE";
static final String ORC = "ORC";
static final String PARQUET = "PARQUET";
static final String AVRO = "AVRO";
static final String RCFILE = "RCFILE";
static final AllowableValue TEXTFILE_STORAGE = new AllowableValue(TEXTFILE, TEXTFILE, "Stored as plain text files. TEXTFILE is the default file format, unless the configuration "
+ "parameter hive.default.fileformat has a different setting.");
static final AllowableValue SEQUENCEFILE_STORAGE = new AllowableValue(SEQUENCEFILE, SEQUENCEFILE, "Stored as compressed Sequence Files.");
static final AllowableValue ORC_STORAGE = new AllowableValue(ORC, ORC, "Stored as ORC file format. Supports ACID Transactions & Cost-based Optimizer (CBO). "
+ "Stores column-level metadata.");
static final AllowableValue PARQUET_STORAGE = new AllowableValue(PARQUET, PARQUET, "Stored as Parquet format for the Parquet columnar storage format.");
static final AllowableValue AVRO_STORAGE = new AllowableValue(AVRO, AVRO, "Stored as Avro format.");
static final AllowableValue RCFILE_STORAGE = new AllowableValue(RCFILE, RCFILE, "Stored as Record Columnar File format.");
static final AllowableValue CREATE_IF_NOT_EXISTS = new AllowableValue("Create If Not Exists", "Create If Not Exists",
"Create a table with the given schema if it does not already exist");
static final AllowableValue FAIL_IF_NOT_EXISTS = new AllowableValue("Fail If Not Exists", "Fail If Not Exists",
"If the target does not already exist, log an error and route the flowfile to failure");
static final String ATTR_OUTPUT_TABLE = "output.table";
static final String ATTR_OUTPUT_PATH = "output.path";
// Properties
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.displayName("Record Reader")
.description("The service for reading incoming flow files. The reader is only used to determine the schema of the records, the actual records will not be processed.")
static final PropertyDescriptor HIVE_DBCP_SERVICE = new PropertyDescriptor.Builder()
.displayName("Hive Database Connection Pooling Service")
.description("The Hive Controller Service that is used to obtain connection(s) to the Hive database")
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.displayName("Table Name")
.description("The name of the database table to update. If the table does not exist, then it will either be created or an error thrown, depending "
+ "on the value of the Create Table property.")
static final PropertyDescriptor CREATE_TABLE = new PropertyDescriptor.Builder()
.displayName("Create Table Strategy")
.description("Specifies how to process the target table when it does not exist (create it, fail, e.g.).")
static final PropertyDescriptor TABLE_STORAGE_FORMAT = new PropertyDescriptor.Builder()
.displayName("Create Table Storage Format")
.description("If a table is to be created, the specified storage format will be used.")
static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
.displayName("Query timeout")
.description("Sets the number of seconds the driver will wait for a query to execute. "
+ "A value of 0 means no timeout. NOTE: Non-zero values may not be supported by the driver.")
static final PropertyDescriptor STATIC_PARTITION_VALUES = new PropertyDescriptor.Builder()
.displayName("Static Partition Values")
.description("Specifies a comma-separated list of the values for the partition columns of the target table. This assumes all incoming records belong to the same partition "
+ "and the partition columns are not fields in the record. If specified, this property will often contain "
+ "Expression Language. For example if PartitionRecord is upstream and two partition columns 'name' and 'age' are used, then this property can be set to "
+ "${name},${age}. This property must be set if the table is partitioned, and must not be set if the table is not partitioned. If this property is set, the values "
+ "will be used as the partition values, and the partition.location value will reflect the location of the partition in the filesystem (for use downstream in "
+ "processors like PutHDFS).")
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.description("A FlowFile containing records routed to this relationship after the record has been successfully transmitted to Hive.")
public static final Relationship REL_FAILURE = new Relationship.Builder()
.description("A FlowFile containing records routed to this relationship if the record could not be transmitted to Hive.")
private List<PropertyDescriptor> propertyDescriptors;
private Set<Relationship> relationships;
protected void init(ProcessorInitializationContext context) {
List<PropertyDescriptor> props = new ArrayList<>();
propertyDescriptors = Collections.unmodifiableList(props);
Set<Relationship> _relationships = new HashSet<>();
relationships = Collections.unmodifiableSet(_relationships);
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
public Set<Relationship> getRelationships() {
return relationships;
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String staticPartitionValuesString = context.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
List<String> staticPartitionValues = null;
if (!StringUtils.isEmpty(staticPartitionValuesString)) {
staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
final ComponentLog log = getLogger();
try {
final RecordReader reader;
try (final InputStream in = session.read(flowFile)) {
// if we fail to create the RecordReader then we want to route to failure, so we need to
// handle this separately from the other IOExceptions which normally route to retry
try {
reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
} catch (Exception e) {
throw new RecordReaderFactoryException("Unable to create RecordReader", e);
} catch (RecordReaderFactoryException rrfe) {
"Failed to create {} for {} - routing to failure",
new Object[]{RecordReader.class.getSimpleName(), flowFile},
session.transfer(flowFile, REL_FAILURE);
RecordSchema recordSchema = reader.getSchema();
final boolean createIfNotExists = context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
final String storageFormat = context.getProperty(TABLE_STORAGE_FORMAT).getValue();
final Hive_1_1DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive_1_1DBCPService.class);
try (final Connection connection = dbcpService.getConnection()) {
checkAndUpdateTableSchema(session, flowFile, connection, recordSchema, tableName, staticPartitionValues, createIfNotExists, storageFormat);
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, dbcpService.getConnectionURL());
session.transfer(flowFile, REL_SUCCESS);
} catch (IOException | SQLException e) {
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
"Exception while processing {} - routing to failure",
new Object[]{flowFile},
session.transfer(flowFile, REL_FAILURE);
} catch (DiscontinuedException e) {
// The input FlowFile processing is discontinued. Keep it in the input queue.
getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
session.transfer(flowFile, Relationship.SELF);
} catch (Throwable t) {
throw (t instanceof ProcessException) ? (ProcessException) t : new ProcessException(t);
private synchronized void checkAndUpdateTableSchema(final ProcessSession session, final FlowFile flowFile, final Connection conn, final RecordSchema schema,
final String tableName, final List<String> partitionValues,
final boolean createIfNotExists, final String storageFormat) throws IOException {
// Read in the current table metadata, compare it to the reader's schema, and
// add any columns from the schema that are missing in the table
try (Statement s = conn.createStatement()) {
// Determine whether the table exists
ResultSet tables = s.executeQuery("SHOW TABLES");
List<String> tableNames = new ArrayList<>();
String hiveTableName;
while (tables.next() && StringUtils.isNotEmpty(hiveTableName = tables.getString(1))) {
List<String> columnsToAdd = new ArrayList<>();
String outputPath;
if (!tableNames.contains(tableName) && createIfNotExists) {
StringBuilder createTableStatement = new StringBuilder();
for (RecordField recordField : schema.getFields()) {
String recordFieldName = recordField.getFieldName();
// The field does not exist in the table, add it
columnsToAdd.add(recordFieldName + " " + getHiveTypeFromFieldType(recordField.getDataType(), true));
getLogger().debug("Adding column " + recordFieldName + " to table " + tableName);
createTableStatement.append("CREATE TABLE IF NOT EXISTS ")
.append(" (")
.append(String.join(", ", columnsToAdd))
.append(") STORED AS ")
String createTableSql = createTableStatement.toString();
if (StringUtils.isNotEmpty(createTableSql)) {
// Perform the table create
getLogger().info("Executing Hive DDL: " + createTableSql);
// Now that the table is created, describe it and determine its location (for placing the flowfile downstream)
String describeTable = "DESC FORMATTED " + tableName;
ResultSet tableInfo = s.executeQuery(describeTable);
boolean moreRows = tableInfo.next();
boolean locationFound = false;
while (moreRows && !locationFound) {
String line = tableInfo.getString(1);
if (line.startsWith("Location:")) {
locationFound = true;
continue; // Don't do a next() here, need to get the second column value
moreRows = tableInfo.next();
outputPath = tableInfo.getString(2);
} else {
List<String> hiveColumns = new ArrayList<>();
String describeTable = "DESC FORMATTED " + tableName;
ResultSet tableInfo = s.executeQuery(describeTable);
// Result is 3 columns, col_name, data_type, comment. Check the first row for a header and skip if so, otherwise add column name
String columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
// If the column was a header, check for a blank line to follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) {
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName)) {
// Collect all column names
while (tableInfo.next() && StringUtils.isNotEmpty(columnName = tableInfo.getString(1))) {
// Collect all partition columns
boolean moreRows = true;
boolean headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if ("# Partition Information".equals(line)) {
headerFound = true;
} else if ("# Detailed Table Information".equals(line)) {
// Not partitioned, exit the loop with headerFound = false
moreRows = tableInfo.next();
List<String> partitionColumns = new ArrayList<>();
List<String> partitionColumnsEqualsValueList = new ArrayList<>();
List<String> partitionColumnsLocationList = new ArrayList<>();
if (headerFound) {
// If the table is partitioned, construct the partition=value strings for each partition column
String partitionColumnName;
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
// If the column was a header, check for a blank line to follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) {
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName)) {
while (tableInfo.next() && StringUtils.isNotEmpty(partitionColumnName = tableInfo.getString(1))) {
final int partitionColumnsSize = partitionColumns.size();
if (partitionValues == null) {
throw new IOException("Found " + partitionColumnsSize + " partition columns but no Static Partition Values were supplied");
final int partitionValuesSize = partitionValues.size();
if (partitionValuesSize < partitionColumnsSize) {
throw new IOException("Found " + partitionColumnsSize + " partition columns but only " + partitionValuesSize + " Static Partition Values were supplied");
for (int i = 0; i < partitionColumns.size(); i++) {
partitionColumnsEqualsValueList.add(partitionColumns.get(i) + "='" + partitionValues.get(i) + "'");
// Add unquoted version for the output path
partitionColumnsLocationList.add(partitionColumns.get(i) + "=" + partitionValues.get(i));
// Get table location
moreRows = true;
headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if (line.startsWith("Location:")) {
headerFound = true;
continue; // Don't do a next() here, need to get the second column value
moreRows = tableInfo.next();
String tableLocation = tableInfo.getString(2);
StringBuilder alterTableStatement = new StringBuilder();
// Handle new columns
for (RecordField recordField : schema.getFields()) {
String recordFieldName = recordField.getFieldName().toLowerCase();
if (!hiveColumns.contains(recordFieldName) && !partitionColumns.contains(recordFieldName)) {
// The field does not exist in the table (and is not a partition column), add it
columnsToAdd.add(recordFieldName + " " + getHiveTypeFromFieldType(recordField.getDataType(), true));
getLogger().info("Adding column " + recordFieldName + " to table " + tableName);
String alterTableSql;
if (!columnsToAdd.isEmpty()) {
alterTableStatement.append("ALTER TABLE ")
.append(" ADD COLUMNS (")
.append(String.join(", ", columnsToAdd))
alterTableSql = alterTableStatement.toString();
if (StringUtils.isNotEmpty(alterTableSql)) {
// Perform the table update
getLogger().info("Executing Hive DDL: " + alterTableSql);
outputPath = tableLocation;
// Handle new partitions
if (!partitionColumnsEqualsValueList.isEmpty()) {
alterTableSql = "ALTER TABLE " +
tableName +
String.join(", ", partitionColumnsEqualsValueList) +
if (StringUtils.isNotEmpty(alterTableSql)) {
// Perform the table update
getLogger().info("Executing Hive DDL: " + alterTableSql);
// Add attribute for HDFS location of the partition values
outputPath = tableLocation + "/" + String.join("/", partitionColumnsLocationList);
session.putAttribute(flowFile, ATTR_OUTPUT_PATH, outputPath);
} catch (Exception e) {
throw new IOException(e);
public static String getHiveTypeFromFieldType(DataType rawDataType, boolean hiveFieldNames) {
if (rawDataType == null) {
throw new IllegalArgumentException("Field type is null");
RecordFieldType dataType = rawDataType.getFieldType();
if (RecordFieldType.INT.equals(dataType)) {
return "INT";
if (RecordFieldType.LONG.equals(dataType)) {
return "BIGINT";
if (RecordFieldType.BOOLEAN.equals(dataType)) {
return "BOOLEAN";
if (RecordFieldType.DOUBLE.equals(dataType)) {
return "DOUBLE";
if (RecordFieldType.FLOAT.equals(dataType)) {
return "FLOAT";
if (RecordFieldType.DECIMAL.equals(dataType)) {
return "DECIMAL";
if (RecordFieldType.STRING.equals(dataType) || RecordFieldType.ENUM.equals(dataType)) {
return "STRING";
if (RecordFieldType.DATE.equals(dataType)) {
return "DATE";
if (RecordFieldType.TIME.equals(dataType)) {
return "INT";
if (RecordFieldType.TIMESTAMP.equals(dataType)) {
return "TIMESTAMP";
if (RecordFieldType.ARRAY.equals(dataType)) {
ArrayDataType arrayDataType = (ArrayDataType) rawDataType;
if (RecordFieldType.BYTE.getDataType().equals(arrayDataType.getElementType())) {
return "BINARY";
return "ARRAY<" + getHiveTypeFromFieldType(arrayDataType.getElementType(), hiveFieldNames) + ">";
if (RecordFieldType.MAP.equals(dataType)) {
MapDataType mapDataType = (MapDataType) rawDataType;
return "MAP<STRING, " + getHiveTypeFromFieldType(mapDataType.getValueType(), hiveFieldNames) + ">";
if (RecordFieldType.CHOICE.equals(dataType)) {
ChoiceDataType choiceDataType = (ChoiceDataType) rawDataType;
List<DataType> unionFieldSchemas = choiceDataType.getPossibleSubTypes();
if (unionFieldSchemas != null) {
// Ignore null types in union
List<String> hiveFields = unionFieldSchemas.stream()
.map((it) -> getHiveTypeFromFieldType(it, hiveFieldNames))
// Flatten the field if the union only has one non-null element
return (hiveFields.size() == 1)
? hiveFields.get(0)
: "UNIONTYPE<" + org.apache.commons.lang3.StringUtils.join(hiveFields, ", ") + ">";
return null;
if (RecordFieldType.RECORD.equals(dataType)) {
RecordDataType recordDataType = (RecordDataType) rawDataType;
List<RecordField> recordFields = recordDataType.getChildSchema().getFields();
if (recordFields != null) {
List<String> hiveFields = recordFields.stream().map(
recordField -> ("`" + (hiveFieldNames ? recordField.getFieldName().toLowerCase() : recordField.getFieldName()) + "`:"
+ getHiveTypeFromFieldType(recordField.getDataType(), hiveFieldNames))).collect(Collectors.toList());
return "STRUCT<" + org.apache.commons.lang3.StringUtils.join(hiveFields, ", ") + ">";
return null;
throw new IllegalArgumentException("Error converting Avro type " + dataType.name() + " to Hive type");
@ -14,3 +14,4 @@
# limitations under the License.
@ -0,0 +1,375 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.processors.hive;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.dbcp.hive.Hive_1_1DBCPService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestUpdateHive_1_1Table {
private static final String TEST_CONF_PATH = "src/test/resources/core-site.xml";
private static final String TARGET_HIVE = "target/hive";
private static final String[] SHOW_TABLES_COLUMN_NAMES = new String[]{"tab_name"};
private static final String[][] SHOW_TABLES_RESULTSET = new String[][]{
new String[]{"messages"},
new String[]{"users"},
private static final String[] DESC_MESSAGES_TABLE_COLUMN_NAMES = new String[]{"id", "msg"};
private static final String[][] DESC_MESSAGES_TABLE_RESULTSET = new String[][]{
new String[]{"# col_name", "data_type", "comment"},
new String[]{"id", "int", ""},
new String[]{"msg", "string", ""},
new String[]{"", null, null},
new String[]{"# Partition Information", null, null},
new String[]{"# col_name", "data_type", "comment"},
new String[]{"continent", "string", ""},
new String[]{"country", "string", ""},
new String[]{"", null, null},
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/messages", null}
private static final String[] DESC_USERS_TABLE_COLUMN_NAMES = new String[]{"name", "favorite_number", "favorite_color", "scale"};
private static final String[][] DESC_USERS_TABLE_RESULTSET = new String[][]{
new String[]{"name", "string", ""},
new String[]{"favorite_number", "int", ""},
new String[]{"favorite_color", "string", ""},
new String[]{"scale", "double", ""},
new String[]{"", null, null},
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/users", null}
private static final String[][] DESC_NEW_TABLE_RESULTSET = new String[][]{
new String[]{"", null, null},
new String[]{"name", "string", ""},
new String[]{"favorite_number", "int", ""},
new String[]{"favorite_color", "string", ""},
new String[]{"scale", "double", ""},
new String[]{"", null, null},
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/newTable", null}
public TemporaryFolder folder = new TemporaryFolder();
private TestRunner runner;
private UpdateHive_1_1Table processor;
public void setUp() {
Configuration testConf = new Configuration();
testConf.addResource(new Path(TEST_CONF_PATH));
// Delete any temp files from previous tests
try {
FileUtils.deleteDirectory(new File(TARGET_HIVE));
} catch (IOException ioe) {
// Do nothing, directory may not have existed
processor = new UpdateHive_1_1Table();
private void configure(final UpdateHive_1_1Table processor, final int numUsers) throws InitializationException {
configure(processor, numUsers, false, -1);
private void configure(final UpdateHive_1_1Table processor, final int numUsers, boolean failOnCreateReader, int failAfter) throws InitializationException {
configure(processor, numUsers, failOnCreateReader, failAfter, null);
private void configure(final UpdateHive_1_1Table processor, final int numUsers, final boolean failOnCreateReader, final int failAfter,
final BiFunction<Integer, MockRecordParser, Void> recordGenerator) throws InitializationException {
runner = TestRunners.newTestRunner(processor);
MockRecordParser readerFactory = new MockRecordParser() {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
if (failOnCreateReader) {
throw new SchemaNotFoundException("test");
return super.createRecordReader(variables, in, inputLength, logger);
List<RecordField> fields = Arrays.asList(
new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("favorite_number", RecordFieldType.INT.getDataType()),
new RecordField("favorite_color", RecordFieldType.STRING.getDataType()),
new RecordField("scale", RecordFieldType.DOUBLE.getDataType())
final SimpleRecordSchema recordSchema = new SimpleRecordSchema(fields);
for (final RecordField recordField : recordSchema.getFields()) {
readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
if (recordGenerator == null) {
for (int i = 0; i < numUsers; i++) {
readerFactory.addRecord("name" + i, i, "blue" + i, i * 10.0);
} else {
recordGenerator.apply(numUsers, readerFactory);
runner.addControllerService("mock-reader-factory", readerFactory);
runner.setProperty(UpdateHive_1_1Table.RECORD_READER, "mock-reader-factory");
public void testSetup() throws Exception {
configure(processor, 0);
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.setProperty(UpdateHive_1_1Table.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHive_1_1Table.TABLE_NAME, "users");
public void testNoStatementsExecuted() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive_1_1Table.TABLE_NAME, "users");
final MockDBCPService service = new MockDBCPService("test");
runner.addControllerService("dbcp", service);
runner.setProperty(UpdateHive_1_1Table.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHive_1_1Table.STATIC_PARTITION_VALUES, "Asia,China");
runner.enqueue(new byte[0]);
runner.assertTransferCount(UpdateHive_1_1Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive_1_1Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_TABLE, "users");
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/users");
public void testCreateTable() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive_1_1Table.TABLE_NAME, "${table.name}");
runner.setProperty(UpdateHive_1_1Table.CREATE_TABLE, UpdateHive_1_1Table.CREATE_IF_NOT_EXISTS);
runner.setProperty(UpdateHive_1_1Table.TABLE_STORAGE_FORMAT, UpdateHive_1_1Table.PARQUET);
final MockDBCPService service = new MockDBCPService("newTable");
runner.addControllerService("dbcp", service);
runner.setProperty(UpdateHive_1_1Table.HIVE_DBCP_SERVICE, "dbcp");
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "newTable");
runner.enqueue(new byte[0], attrs);
runner.assertTransferCount(UpdateHive_1_1Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive_1_1Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_TABLE, "newTable");
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/newTable");
List<String> statements = service.getExecutedStatements();
assertEquals(1, statements.size());
assertEquals("CREATE TABLE IF NOT EXISTS newTable (name STRING, favorite_number INT, favorite_color STRING, scale DOUBLE) STORED AS PARQUET",
public void testAddColumnsAndPartition() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive_1_1Table.TABLE_NAME, "messages");
final MockDBCPService service = new MockDBCPService("test");
runner.addControllerService("dbcp", service);
runner.setProperty(UpdateHive_1_1Table.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHive_1_1Table.STATIC_PARTITION_VALUES, "Asia,China");
runner.enqueue(new byte[0]);
runner.assertTransferCount(UpdateHive_1_1Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive_1_1Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_TABLE, "messages");
List<String> statements = service.getExecutedStatements();
assertEquals(2, statements.size());
// All columns from users table/data should be added to the table, and a new partition should be added
assertEquals("ALTER TABLE messages ADD COLUMNS (name STRING, favorite_number INT, favorite_color STRING, scale DOUBLE)",
assertEquals("ALTER TABLE messages ADD IF NOT EXISTS PARTITION (continent='Asia', country='China')",
public void testMissingPartitionValues() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive_1_1Table.TABLE_NAME, "messages");
final DBCPService service = new MockDBCPService("test");
runner.addControllerService("dbcp", service);
runner.setProperty(UpdateHive_1_1Table.HIVE_DBCP_SERVICE, "dbcp");
runner.enqueue(new byte[0]);
runner.assertTransferCount(UpdateHive_1_1Table.REL_SUCCESS, 0);
runner.assertTransferCount(UpdateHive_1_1Table.REL_FAILURE, 1);
* Simple implementation only for testing purposes
private static class MockDBCPService extends AbstractControllerService implements Hive_1_1DBCPService {
private final String dbLocation;
private final List<String> executedStatements = new ArrayList<>();
MockDBCPService(final String dbLocation) {
this.dbLocation = dbLocation;
public String getIdentifier() {
return "dbcp";
public Connection getConnection() throws ProcessException {
try {
Connection conn = mock(Connection.class);
Statement s = mock(Statement.class);
when(s.executeQuery(anyString())).thenAnswer((Answer<ResultSet>) invocation -> {
final String query = (String) invocation.getArguments()[0];
if ("SHOW TABLES".equals(query)) {
return new MockResultSet(SHOW_TABLES_COLUMN_NAMES, SHOW_TABLES_RESULTSET).createResultSet();
} else if ("DESC FORMATTED messages".equals(query)) {
} else if ("DESC FORMATTED users".equals(query)) {
} else if ("DESC FORMATTED newTable".equals(query)) {
return new MockResultSet(DESC_NEW_TABLE_COLUMN_NAMES, DESC_NEW_TABLE_RESULTSET).createResultSet();
} else {
return new MockResultSet(new String[]{}, new String[][]{new String[]{}}).createResultSet();
when(s.execute(anyString())).thenAnswer((Answer<Boolean>) invocation -> {
executedStatements.add((String) invocation.getArguments()[0]);
return false;
return conn;
} catch (final Exception e) {
throw new ProcessException("getConnection failed: " + e);
public String getConnectionURL() {
return "jdbc:fake:" + dbLocation;
List<String> getExecutedStatements() {
return executedStatements;
private static class MockResultSet {
String[] colNames;
String[][] data;
int currentRow;
MockResultSet(String[] colNames, String[][] data) {
this.colNames = colNames;
this.data = data;
currentRow = 0;
ResultSet createResultSet() throws SQLException {
ResultSet rs = mock(ResultSet.class);
when(rs.next()).thenAnswer((Answer<Boolean>) invocation -> (data != null) && (++currentRow <= data.length));
when(rs.getString(anyInt())).thenAnswer((Answer<String>) invocation -> {
final int index = (int) invocation.getArguments()[0];
if (index < 1) {
throw new SQLException("Columns start with index 1");
if (currentRow > data.length) {
throw new SQLException("This result set is already closed");
return data[currentRow - 1][index - 1];
return rs;
Reference in New Issue
Block a user