NIFI-7989: Add support to UpdateHiveTable for creating external tables

NIFI-7989: Add support for creating partitions, quote identifiers
NIFI-7989: Quote table name when getting description

This closes #4697.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Matthew Burgess 2020-11-30 18:29:11 -05:00 committed by Peter Turcsanyi
parent 28ca7478d6
commit f29d6a6046
6 changed files with 981 additions and 469 deletions

View File

@ -18,6 +18,8 @@ package org.apache.nifi.processors.hive;
import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@ -63,6 +65,10 @@ import java.util.stream.Collectors;
@Tags({"hive", "metadata", "jdbc", "database", "table"})
@CapabilityDescription("This processor uses a Hive JDBC connection and incoming records to generate any Hive 1.2 table changes needed to support the incoming records.")
@ReadsAttributes({
@ReadsAttribute(attribute = "hive.table.management.strategy", description = "This attribute is read if the 'Table Management Strategy' property is configured "
+ "to use the value of this attribute. The value of this attribute should correspond (ignoring case) to a valid option of the 'Table Management Strategy' property.")
})
@WritesAttributes({
@WritesAttribute(attribute = "output.table", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the target table name."),
@ -94,6 +100,16 @@ public class UpdateHiveTable extends AbstractProcessor {
static final AllowableValue FAIL_IF_NOT_EXISTS = new AllowableValue("Fail If Not Exists", "Fail If Not Exists",
"If the target does not already exist, log an error and route the flowfile to failure");
static final String TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE = "hive.table.management.strategy";
static final AllowableValue MANAGED_TABLE = new AllowableValue("Managed", "Managed",
"Any tables created by this processor will be managed tables (see Hive documentation for details).");
static final AllowableValue EXTERNAL_TABLE = new AllowableValue("External", "External",
"Any tables created by this processor will be external tables located at the `External Table Location` property value.");
static final AllowableValue ATTRIBUTE_DRIVEN_TABLE = new AllowableValue("Use '" + TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE + "' Attribute",
"Use '" + TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE + "' Attribute",
"Inspects the '" + TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE + "' FlowFile attribute to determine the table management strategy. The value "
+ "of this attribute must be a case-insensitive match to one of the other allowable values (Managed, External, e.g.).");
static final String ATTR_OUTPUT_TABLE = "output.table";
static final String ATTR_OUTPUT_PATH = "output.path";
@ -125,7 +141,7 @@ public class UpdateHiveTable extends AbstractProcessor {
.build();
static final PropertyDescriptor CREATE_TABLE = new PropertyDescriptor.Builder()
.name("hive3-create-table")
.name("hive-create-table")
.displayName("Create Table Strategy")
.description("Specifies how to process the target table when it does not exist (create it, fail, e.g.).")
.required(true)
@ -134,8 +150,31 @@ public class UpdateHiveTable extends AbstractProcessor {
.defaultValue(FAIL_IF_NOT_EXISTS.getValue())
.build();
static final PropertyDescriptor TABLE_MANAGEMENT_STRATEGY = new PropertyDescriptor.Builder()
.name("hive-create-table-management")
.displayName("Create Table Management Strategy")
.description("Specifies (when a table is to be created) whether the table is a managed table or an external table. Note that when External is specified, the "
+ "'External Table Location' property must be specified. If the '" + TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE + "' value is selected, 'External Table Location' "
+ "must still be specified, but can contain Expression Language or be set to the empty string, and is ignored when the attribute evaluates to 'Managed'.")
.required(true)
.addValidator(Validator.VALID)
.allowableValues(MANAGED_TABLE, EXTERNAL_TABLE, ATTRIBUTE_DRIVEN_TABLE)
.defaultValue(MANAGED_TABLE.getValue())
.dependsOn(CREATE_TABLE, CREATE_IF_NOT_EXISTS)
.build();
static final PropertyDescriptor EXTERNAL_TABLE_LOCATION = new PropertyDescriptor.Builder()
.name("hive-external-table-location")
.displayName("External Table Location")
.description("Specifies (when an external table is to be created) the file path (in HDFS, e.g.) to store table data.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.dependsOn(TABLE_MANAGEMENT_STRATEGY, EXTERNAL_TABLE, ATTRIBUTE_DRIVEN_TABLE)
.build();
static final PropertyDescriptor TABLE_STORAGE_FORMAT = new PropertyDescriptor.Builder()
.name("hive3-storage-format")
.name("hive-storage-format")
.displayName("Create Table Storage Format")
.description("If a table is to be created, the specified storage format will be used.")
.required(true)
@ -147,7 +186,7 @@ public class UpdateHiveTable extends AbstractProcessor {
static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
.name("hive-query-timeout")
.displayName("Query timeout")
.displayName("Query Timeout")
.description("Sets the number of seconds the driver will wait for a query to execute. "
+ "A value of 0 means no timeout. NOTE: Non-zero values may not be supported by the driver.")
.defaultValue("0")
@ -156,15 +195,18 @@ public class UpdateHiveTable extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor STATIC_PARTITION_VALUES = new PropertyDescriptor.Builder()
.name("hive-part-vals")
.displayName("Static Partition Values")
.description("Specifies a comma-separated list of the values for the partition columns of the target table. This assumes all incoming records belong to the same partition "
+ "and the partition columns are not fields in the record. If specified, this property will often contain "
+ "Expression Language. For example if PartitionRecord is upstream and two partition columns 'name' and 'age' are used, then this property can be set to "
+ "${name},${age}. This property must be set if the table is partitioned, and must not be set if the table is not partitioned. If this property is set, the values "
+ "will be used as the partition values, and the partition.location value will reflect the location of the partition in the filesystem (for use downstream in "
+ "processors like PutHDFS).")
static final PropertyDescriptor PARTITION_CLAUSE = new PropertyDescriptor.Builder()
.name("hive-partition-clause")
.displayName("Partition Clause")
.description("Specifies a comma-separated list of attribute names and optional data types corresponding to the partition columns of the target table. Simply put, if the table is "
+ "partitioned or is to be created with partitions, each partition name should be an attribute on the FlowFile and listed in this property. This assumes all incoming records "
+ "belong to the same partition and the partition columns are not fields in the record. An example of specifying this field is if PartitionRecord "
+ "is upstream and two partition columns 'name' (of type string) and 'age' (of type integer) are used, then this property can be set to 'name string, age int'. The data types "
+ "are optional and if partition(s) are to be created they will default to string type if not specified. For non-string primitive types, specifying the data type for existing "
+ "partition columns is helpful for interpreting the partition value(s). If the table exists, the data types need not be specified "
+ "(and are ignored in that case). This property must be set if the table is partitioned, and there must be an attribute for each partition column in the table. "
+ "The values of the attributes will be used as the partition values, and the resulting output.path attribute value will reflect the location of the partition in the filesystem "
+ "(for use downstream in processors such as PutHDFS).")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -190,8 +232,10 @@ public class UpdateHiveTable extends AbstractProcessor {
props.add(RECORD_READER);
props.add(HIVE_DBCP_SERVICE);
props.add(TABLE_NAME);
props.add(STATIC_PARTITION_VALUES);
props.add(PARTITION_CLAUSE);
props.add(CREATE_TABLE);
props.add(TABLE_MANAGEMENT_STRATEGY);
props.add(EXTERNAL_TABLE_LOCATION);
props.add(TABLE_STORAGE_FORMAT);
props.add(QUERY_TIMEOUT);
@ -223,10 +267,10 @@ public class UpdateHiveTable extends AbstractProcessor {
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String staticPartitionValuesString = context.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
List<String> staticPartitionValues = null;
if (!StringUtils.isEmpty(staticPartitionValuesString)) {
staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
final String partitionClauseString = context.getProperty(PARTITION_CLAUSE).evaluateAttributeExpressions(flowFile).getValue();
List<String> partitionClauseElements = null;
if (!StringUtils.isEmpty(partitionClauseString)) {
partitionClauseElements = Arrays.stream(partitionClauseString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
}
final ComponentLog log = getLogger();
@ -255,10 +299,39 @@ public class UpdateHiveTable extends AbstractProcessor {
RecordSchema recordSchema = reader.getSchema();
final boolean createIfNotExists = context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
final String tableManagementStrategy = context.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue();
final boolean managedTable;
if (ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) {
String tableManagementStrategyAttribute = flowFile.getAttribute(TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE);
if (MANAGED_TABLE.getValue().equalsIgnoreCase(tableManagementStrategyAttribute)) {
managedTable = true;
} else if (EXTERNAL_TABLE.getValue().equalsIgnoreCase(tableManagementStrategyAttribute)) {
managedTable = false;
} else {
log.error("The '{}' attribute either does not exist or has invalid value: {}. Must be one of (ignoring case): Managed, External. "
+ "Routing flowfile to failure",
new Object[]{TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE, tableManagementStrategyAttribute});
session.transfer(flowFile, REL_FAILURE);
return;
}
} else {
managedTable = MANAGED_TABLE.getValue().equals(tableManagementStrategy);
}
// Ensure valid configuration for external tables
if (createIfNotExists && !managedTable && !context.getProperty(EXTERNAL_TABLE_LOCATION).isSet()) {
throw new IOException("External Table Location must be set when Table Management Strategy is set to External");
}
final String externalTableLocation = managedTable ? null : context.getProperty(EXTERNAL_TABLE_LOCATION).evaluateAttributeExpressions(flowFile).getValue();
if (!managedTable && StringUtils.isEmpty(externalTableLocation)) {
log.error("External Table Location has invalid value: {}. Routing flowfile to failure", new Object[]{externalTableLocation});
session.transfer(flowFile, REL_FAILURE);
return;
}
final String storageFormat = context.getProperty(TABLE_STORAGE_FORMAT).getValue();
final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
try (final Connection connection = dbcpService.getConnection()) {
checkAndUpdateTableSchema(session, flowFile, connection, recordSchema, tableName, staticPartitionValues, createIfNotExists, storageFormat);
checkAndUpdateTableSchema(session, flowFile, connection, recordSchema, tableName, partitionClauseElements, createIfNotExists, externalTableLocation, storageFormat);
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, dbcpService.getConnectionURL());
session.transfer(flowFile, REL_SUCCESS);
@ -266,11 +339,7 @@ public class UpdateHiveTable extends AbstractProcessor {
} catch (IOException | SQLException e) {
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
log.error(
"Exception while processing {} - routing to failure",
new Object[]{flowFile},
e
);
log.error("Exception while processing {} - routing to failure", new Object[]{flowFile}, e);
session.transfer(flowFile, REL_FAILURE);
} catch (DiscontinuedException e) {
@ -283,8 +352,8 @@ public class UpdateHiveTable extends AbstractProcessor {
}
private synchronized void checkAndUpdateTableSchema(final ProcessSession session, final FlowFile flowFile, final Connection conn, final RecordSchema schema,
final String tableName, final List<String> partitionValues,
final boolean createIfNotExists, final String storageFormat) throws IOException {
final String tableName, List<String> partitionClause, final boolean createIfNotExists,
final String externalTableLocation, final String storageFormat) throws IOException {
// Read in the current table metadata, compare it to the reader's schema, and
// add any columns from the schema that are missing in the table
try (Statement s = conn.createStatement()) {
@ -298,20 +367,41 @@ public class UpdateHiveTable extends AbstractProcessor {
List<String> columnsToAdd = new ArrayList<>();
String outputPath;
boolean tableCreated = false;
if (!tableNames.contains(tableName) && createIfNotExists) {
StringBuilder createTableStatement = new StringBuilder();
for (RecordField recordField : schema.getFields()) {
String recordFieldName = recordField.getFieldName();
// The field does not exist in the table, add it
columnsToAdd.add(recordFieldName + " " + NiFiOrcUtils.getHiveTypeFromFieldType(recordField.getDataType(), true));
columnsToAdd.add("`" + recordFieldName + "` " + NiFiOrcUtils.getHiveTypeFromFieldType(recordField.getDataType(), true));
getLogger().debug("Adding column " + recordFieldName + " to table " + tableName);
}
createTableStatement.append("CREATE TABLE IF NOT EXISTS ")
// Handle partition clause
if (partitionClause == null) {
partitionClause = Collections.emptyList();
}
List<String> validatedPartitionClause = new ArrayList<>(partitionClause.size());
for (String partition : partitionClause) {
String[] partitionInfo = partition.split(" ");
if (partitionInfo.length != 2) {
validatedPartitionClause.add("`" + partitionInfo[0] + "` string");
} else {
validatedPartitionClause.add("`" + partitionInfo[0] + "` " + partitionInfo[1]);
}
}
createTableStatement.append("CREATE ")
.append(externalTableLocation == null ? "" : "EXTERNAL ")
.append("TABLE IF NOT EXISTS `")
.append(tableName)
.append(" (")
.append("` (")
.append(String.join(", ", columnsToAdd))
.append(") STORED AS ")
.append(storageFormat);
.append(") ")
.append(validatedPartitionClause.isEmpty() ? "" : "PARTITIONED BY (" + String.join(", ", validatedPartitionClause) + ") ")
.append("STORED AS ")
.append(storageFormat)
.append(externalTableLocation == null ? "" : " LOCATION '" + externalTableLocation + "'");
String createTableSql = createTableStatement.toString();
@ -321,29 +411,55 @@ public class UpdateHiveTable extends AbstractProcessor {
s.execute(createTableSql);
}
// Now that the table is created, describe it and determine its location (for placing the flowfile downstream)
String describeTable = "DESC FORMATTED " + tableName;
ResultSet tableInfo = s.executeQuery(describeTable);
boolean moreRows = tableInfo.next();
boolean locationFound = false;
while (moreRows && !locationFound) {
String line = tableInfo.getString(1);
if (line.startsWith("Location:")) {
locationFound = true;
continue; // Don't do a next() here, need to get the second column value
}
moreRows = tableInfo.next();
}
outputPath = tableInfo.getString(2);
tableCreated = true;
}
} else {
List<String> hiveColumns = new ArrayList<>();
// Process the table (columns, partitions, location, etc.)
List<String> hiveColumns = new ArrayList<>();
String describeTable = "DESC FORMATTED " + tableName;
ResultSet tableInfo = s.executeQuery(describeTable);
// Result is 3 columns, col_name, data_type, comment. Check the first row for a header and skip if so, otherwise add column name
String describeTable = "DESC FORMATTED `" + tableName + "`";
ResultSet tableInfo = s.executeQuery(describeTable);
// Result is 3 columns, col_name, data_type, comment. Check the first row for a header and skip if so, otherwise add column name
tableInfo.next();
String columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
hiveColumns.add(columnName);
}
// If the column was a header, check for a blank line to follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) {
tableInfo.next();
String columnName = tableInfo.getString(1);
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName)) {
hiveColumns.add(columnName);
}
}
// Collect all column names
while (tableInfo.next() && StringUtils.isNotEmpty(columnName = tableInfo.getString(1))) {
hiveColumns.add(columnName);
}
// Collect all partition columns
boolean moreRows = true;
boolean headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if ("# Partition Information".equals(line)) {
headerFound = true;
} else if ("# Detailed Table Information".equals(line)) {
// Not partitioned, exit the loop with headerFound = false
break;
}
moreRows = tableInfo.next();
}
List<String> partitionColumns = new ArrayList<>();
List<String> partitionColumnsEqualsValueList = new ArrayList<>();
List<String> partitionColumnsLocationList = new ArrayList<>();
if (headerFound) {
// If the table is partitioned, construct the partition=value strings for each partition column
String partitionColumnName;
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
hiveColumns.add(columnName);
}
@ -352,97 +468,65 @@ public class UpdateHiveTable extends AbstractProcessor {
tableInfo.next();
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName)) {
hiveColumns.add(columnName);
partitionColumns.add(columnName);
}
}
// Collect all column names
while (tableInfo.next() && StringUtils.isNotEmpty(columnName = tableInfo.getString(1))) {
hiveColumns.add(columnName);
while (tableInfo.next() && StringUtils.isNotEmpty(partitionColumnName = tableInfo.getString(1))) {
partitionColumns.add(partitionColumnName);
}
// Collect all partition columns
boolean moreRows = true;
boolean headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if ("# Partition Information".equals(line)) {
headerFound = true;
} else if ("# Detailed Table Information".equals(line)) {
// Not partitioned, exit the loop with headerFound = false
break;
}
moreRows = tableInfo.next();
final int partitionColumnsSize = partitionColumns.size();
final int partitionClauseSize = (partitionClause == null) ? 0 : partitionClause.size();
if (partitionClauseSize != partitionColumnsSize) {
throw new IOException("Found " + partitionColumnsSize + " partition columns but " + partitionClauseSize + " partition values were supplied");
}
List<String> partitionColumns = new ArrayList<>();
List<String> partitionColumnsEqualsValueList = new ArrayList<>();
List<String> partitionColumnsLocationList = new ArrayList<>();
if (headerFound) {
// If the table is partitioned, construct the partition=value strings for each partition column
String partitionColumnName;
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
hiveColumns.add(columnName);
for (int i = 0; i < partitionClauseSize; i++) {
String partitionName = partitionClause.get(i).split(" ")[0];
String partitionValue = flowFile.getAttribute(partitionName);
if (StringUtils.isEmpty(partitionValue)) {
throw new IOException("No value found for partition value attribute '" + partitionName + "'");
}
// If the column was a header, check for a blank line to follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) {
tableInfo.next();
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName)) {
partitionColumns.add(columnName);
}
}
while (tableInfo.next() && StringUtils.isNotEmpty(partitionColumnName = tableInfo.getString(1))) {
partitionColumns.add(partitionColumnName);
}
final int partitionColumnsSize = partitionColumns.size();
if (partitionValues == null) {
throw new IOException("Found " + partitionColumnsSize + " partition columns but no Static Partition Values were supplied");
}
final int partitionValuesSize = partitionValues.size();
if (partitionValuesSize < partitionColumnsSize) {
throw new IOException("Found " + partitionColumnsSize + " partition columns but only " + partitionValuesSize + " Static Partition Values were supplied");
}
for (int i = 0; i < partitionColumns.size(); i++) {
partitionColumnsEqualsValueList.add(partitionColumns.get(i) + "='" + partitionValues.get(i) + "'");
// Add unquoted version for the output path
partitionColumnsLocationList.add(partitionColumns.get(i) + "=" + partitionValues.get(i));
if (!partitionColumns.contains(partitionName)) {
throw new IOException("Cannot add partition '" + partitionName + "' to existing table");
}
partitionColumnsEqualsValueList.add("`" + partitionName + "`='" + partitionValue + "'");
// Add unquoted version for the output path
partitionColumnsLocationList.add(partitionName + "=" + partitionValue);
}
}
// Get table location
moreRows = true;
headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if (line.startsWith("Location:")) {
headerFound = true;
continue; // Don't do a next() here, need to get the second column value
}
moreRows = tableInfo.next();
// Get table location
moreRows = true;
headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if (line.startsWith("Location:")) {
headerFound = true;
continue; // Don't do a next() here, need to get the second column value
}
String tableLocation = tableInfo.getString(2);
moreRows = tableInfo.next();
}
String tableLocation = tableInfo.getString(2);
String alterTableSql;
// If the table wasn't newly created, alter it accordingly
if (!tableCreated) {
StringBuilder alterTableStatement = new StringBuilder();
// Handle new columns
for (RecordField recordField : schema.getFields()) {
String recordFieldName = recordField.getFieldName().toLowerCase();
if (!hiveColumns.contains(recordFieldName) && !partitionColumns.contains(recordFieldName)) {
// The field does not exist in the table (and is not a partition column), add it
columnsToAdd.add(recordFieldName + " " + NiFiOrcUtils.getHiveTypeFromFieldType(recordField.getDataType(), true));
columnsToAdd.add("`" + recordFieldName + "` " + NiFiOrcUtils.getHiveTypeFromFieldType(recordField.getDataType(), true));
getLogger().info("Adding column " + recordFieldName + " to table " + tableName);
}
}
String alterTableSql;
if (!columnsToAdd.isEmpty()) {
alterTableStatement.append("ALTER TABLE ")
alterTableStatement.append("ALTER TABLE `")
.append(tableName)
.append(" ADD COLUMNS (")
.append("` ADD COLUMNS (")
.append(String.join(", ", columnsToAdd))
.append(")");
@ -453,24 +537,24 @@ public class UpdateHiveTable extends AbstractProcessor {
s.execute(alterTableSql);
}
}
}
outputPath = tableLocation;
outputPath = tableLocation;
// Handle new partitions
if (!partitionColumnsEqualsValueList.isEmpty()) {
alterTableSql = "ALTER TABLE " +
tableName +
" ADD IF NOT EXISTS PARTITION (" +
String.join(", ", partitionColumnsEqualsValueList) +
")";
if (StringUtils.isNotEmpty(alterTableSql)) {
// Perform the table update
getLogger().info("Executing Hive DDL: " + alterTableSql);
s.execute(alterTableSql);
}
// Add attribute for HDFS location of the partition values
outputPath = tableLocation + "/" + String.join("/", partitionColumnsLocationList);
// Handle new partition values
if (!partitionColumnsEqualsValueList.isEmpty()) {
alterTableSql = "ALTER TABLE `" +
tableName +
"` ADD IF NOT EXISTS PARTITION (" +
String.join(", ", partitionColumnsEqualsValueList) +
")";
if (StringUtils.isNotEmpty(alterTableSql)) {
// Perform the table update
getLogger().info("Executing Hive DDL: " + alterTableSql);
s.execute(alterTableSql);
}
// Add attribute for HDFS location of the partition values
outputPath = tableLocation + "/" + String.join("/", partitionColumnsLocationList);
}
session.putAttribute(flowFile, ATTR_OUTPUT_PATH, outputPath);

View File

@ -99,6 +99,15 @@ public class TestUpdateHiveTable {
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/users", null}
};
private static final String[][] DESC_EXTERNAL_USERS_TABLE_RESULTSET = new String[][]{
new String[]{"name", "string", ""},
new String[]{"favorite_number", "int", ""},
new String[]{"favorite_color", "string", ""},
new String[]{"scale", "double", ""},
new String[]{"", null, null},
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/path/to/users", null}
};
private static final String[] DESC_NEW_TABLE_COLUMN_NAMES = DESC_USERS_TABLE_COLUMN_NAMES;
private static final String[][] DESC_NEW_TABLE_RESULTSET = new String[][]{
@ -109,7 +118,7 @@ public class TestUpdateHiveTable {
new String[]{"scale", "double", ""},
new String[]{"", null, null},
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/newTable", null}
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/_newTable", null}
};
@Rule
@ -187,12 +196,11 @@ public class TestUpdateHiveTable {
runner.assertNotValid();
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
final DBCPService service = new MockHiveConnectionPool(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHiveTable.HIVE_DBCP_SERVICE, "dbcp");
runner.assertNotValid();
runner.assertNotValid();
runner.setProperty(UpdateHiveTable.TABLE_NAME, "users");
runner.assertValid();
runner.run();
@ -203,12 +211,15 @@ public class TestUpdateHiveTable {
public void testNoStatementsExecuted() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHiveTable.TABLE_NAME, "users");
final MockDBCPService service = new MockDBCPService("test");
final MockHiveConnectionPool service = new MockHiveConnectionPool("test");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHiveTable.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHiveTable.STATIC_PARTITION_VALUES, "Asia,China");
runner.enqueue(new byte[0]);
runner.setProperty(UpdateHiveTable.PARTITION_CLAUSE, "continent, country");
HashMap<String,String> attrs = new HashMap<>();
attrs.put("continent", "Asia");
attrs.put("country", "China");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHiveTable.REL_SUCCESS, 1);
@ -219,28 +230,87 @@ public class TestUpdateHiveTable {
}
@Test
public void testCreateTable() throws Exception {
public void testCreateManagedTable() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHiveTable.TABLE_NAME, "${table.name}");
runner.setProperty(UpdateHiveTable.CREATE_TABLE, UpdateHiveTable.CREATE_IF_NOT_EXISTS);
runner.setProperty(UpdateHiveTable.TABLE_STORAGE_FORMAT, UpdateHiveTable.PARQUET);
final MockDBCPService service = new MockDBCPService("newTable");
final MockHiveConnectionPool service = new MockHiveConnectionPool("_newTable");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHiveTable.HIVE_DBCP_SERVICE, "dbcp");
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "newTable");
attrs.put("table.name", "_newTable");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHiveTable.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHiveTable.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHiveTable.ATTR_OUTPUT_TABLE, "newTable");
flowFile.assertAttributeEquals(UpdateHiveTable.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/newTable");
flowFile.assertAttributeEquals(UpdateHiveTable.ATTR_OUTPUT_TABLE, "_newTable");
flowFile.assertAttributeEquals(UpdateHiveTable.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/_newTable");
List<String> statements = service.getExecutedStatements();
assertEquals(1, statements.size());
assertEquals("CREATE TABLE IF NOT EXISTS newTable (name STRING, favorite_number INT, favorite_color STRING, scale DOUBLE) STORED AS PARQUET",
assertEquals("CREATE TABLE IF NOT EXISTS `_newTable` (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) STORED AS PARQUET",
statements.get(0));
}
@Test
public void testCreateManagedTableWithPartition() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHiveTable.TABLE_NAME, "${table.name}");
runner.setProperty(UpdateHiveTable.CREATE_TABLE, UpdateHiveTable.CREATE_IF_NOT_EXISTS);
runner.setProperty(UpdateHiveTable.PARTITION_CLAUSE, "age int");
runner.setProperty(UpdateHiveTable.TABLE_STORAGE_FORMAT, UpdateHiveTable.PARQUET);
final MockHiveConnectionPool service = new MockHiveConnectionPool("_newTable");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHiveTable.HIVE_DBCP_SERVICE, "dbcp");
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "_newTable");
attrs.put("age", "23");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHiveTable.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHiveTable.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHiveTable.ATTR_OUTPUT_TABLE, "_newTable");
flowFile.assertAttributeEquals(UpdateHiveTable.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/_newTable");
List<String> statements = service.getExecutedStatements();
assertEquals(1, statements.size());
assertEquals("CREATE TABLE IF NOT EXISTS `_newTable` (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) PARTITIONED BY (`age` int) STORED AS PARQUET",
statements.get(0));
}
@Test
public void testCreateExternalTable() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHiveTable.TABLE_NAME, "${table.name}");
runner.setProperty(UpdateHiveTable.CREATE_TABLE, UpdateHiveTable.CREATE_IF_NOT_EXISTS);
runner.setProperty(UpdateHiveTable.TABLE_MANAGEMENT_STRATEGY, UpdateHiveTable.EXTERNAL_TABLE);
runner.setProperty(UpdateHiveTable.TABLE_STORAGE_FORMAT, UpdateHiveTable.PARQUET);
final MockHiveConnectionPool service = new MockHiveConnectionPool("ext_users");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHiveTable.HIVE_DBCP_SERVICE, "dbcp");
runner.assertNotValid(); // Needs location specified
runner.setProperty(UpdateHiveTable.EXTERNAL_TABLE_LOCATION, "/path/to/users");
runner.assertValid();
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "ext_users");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHiveTable.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHiveTable.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHiveTable.ATTR_OUTPUT_TABLE, "ext_users");
flowFile.assertAttributeEquals(UpdateHiveTable.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/path/to/users");
List<String> statements = service.getExecutedStatements();
assertEquals(1, statements.size());
assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS `ext_users` (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) STORED AS PARQUET "
+ "LOCATION '/path/to/users'",
statements.get(0));
}
@ -248,12 +318,15 @@ public class TestUpdateHiveTable {
public void testAddColumnsAndPartition() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHiveTable.TABLE_NAME, "messages");
final MockDBCPService service = new MockDBCPService("test");
final MockHiveConnectionPool service = new MockHiveConnectionPool("test");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHiveTable.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHiveTable.STATIC_PARTITION_VALUES, "Asia,China");
runner.enqueue(new byte[0]);
runner.setProperty(UpdateHiveTable.PARTITION_CLAUSE, "continent, country");
HashMap<String,String> attrs = new HashMap<>();
attrs.put("continent", "Asia");
attrs.put("country", "China");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHiveTable.REL_SUCCESS, 1);
@ -263,9 +336,9 @@ public class TestUpdateHiveTable {
List<String> statements = service.getExecutedStatements();
assertEquals(2, statements.size());
// All columns from users table/data should be added to the table, and a new partition should be added
assertEquals("ALTER TABLE messages ADD COLUMNS (name STRING, favorite_number INT, favorite_color STRING, scale DOUBLE)",
assertEquals("ALTER TABLE `messages` ADD COLUMNS (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE)",
statements.get(0));
assertEquals("ALTER TABLE messages ADD IF NOT EXISTS PARTITION (continent='Asia', country='China')",
assertEquals("ALTER TABLE `messages` ADD IF NOT EXISTS PARTITION (`continent`='Asia', `country`='China')",
statements.get(1));
}
@ -273,7 +346,7 @@ public class TestUpdateHiveTable {
public void testMissingPartitionValues() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHiveTable.TABLE_NAME, "messages");
final DBCPService service = new MockDBCPService("test");
final DBCPService service = new MockHiveConnectionPool("test");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHiveTable.HIVE_DBCP_SERVICE, "dbcp");
@ -290,12 +363,12 @@ public class TestUpdateHiveTable {
/**
* Simple implementation only for testing purposes
*/
private static class MockDBCPService extends AbstractControllerService implements HiveDBCPService {
private static class MockHiveConnectionPool extends AbstractControllerService implements HiveDBCPService {
private final String dbLocation;
private final List<String> executedStatements = new ArrayList<>();
MockDBCPService(final String dbLocation) {
MockHiveConnectionPool(final String dbLocation) {
this.dbLocation = dbLocation;
}
@ -314,11 +387,13 @@ public class TestUpdateHiveTable {
final String query = invocation.getArgument(0);
if ("SHOW TABLES".equals(query)) {
return new MockResultSet(SHOW_TABLES_COLUMN_NAMES, SHOW_TABLES_RESULTSET).createResultSet();
} else if ("DESC FORMATTED messages".equals(query)) {
} else if ("DESC FORMATTED `messages`".equals(query)) {
return new MockResultSet(DESC_MESSAGES_TABLE_COLUMN_NAMES, DESC_MESSAGES_TABLE_RESULTSET).createResultSet();
} else if ("DESC FORMATTED users".equals(query)) {
} else if ("DESC FORMATTED `users`".equals(query)) {
return new MockResultSet(DESC_USERS_TABLE_COLUMN_NAMES, DESC_USERS_TABLE_RESULTSET).createResultSet();
} else if ("DESC FORMATTED newTable".equals(query)) {
} else if ("DESC FORMATTED `ext_users`".equals(query)) {
return new MockResultSet(DESC_USERS_TABLE_COLUMN_NAMES, DESC_EXTERNAL_USERS_TABLE_RESULTSET).createResultSet();
} else if ("DESC FORMATTED `_newTable`".equals(query)) {
return new MockResultSet(DESC_NEW_TABLE_COLUMN_NAMES, DESC_NEW_TABLE_RESULTSET).createResultSet();
} else {
return new MockResultSet(new String[]{}, new String[][]{new String[]{}}).createResultSet();

View File

@ -18,6 +18,8 @@ package org.apache.nifi.processors.hive;
import org.apache.hadoop.hive.ql.io.orc.NiFiOrcUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@ -63,6 +65,10 @@ import java.util.stream.Collectors;
@Tags({"hive", "metadata", "jdbc", "database", "table"})
@CapabilityDescription("This processor uses a Hive JDBC connection and incoming records to generate any Hive 3.0+ table changes needed to support the incoming records.")
@ReadsAttributes({
@ReadsAttribute(attribute = "hive.table.management.strategy", description = "This attribute is read if the 'Table Management Strategy' property is configured "
+ "to use the value of this attribute. The value of this attribute should correspond (ignoring case) to a valid option of the 'Table Management Strategy' property.")
})
@WritesAttributes({
@WritesAttribute(attribute = "output.table", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the target table name."),
@ -94,6 +100,16 @@ public class UpdateHive3Table extends AbstractProcessor {
static final AllowableValue FAIL_IF_NOT_EXISTS = new AllowableValue("Fail If Not Exists", "Fail If Not Exists",
"If the target does not already exist, log an error and route the flowfile to failure");
static final String TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE = "hive.table.management.strategy";
static final AllowableValue MANAGED_TABLE = new AllowableValue("Managed", "Managed",
"Any tables created by this processor will be managed tables (see Hive documentation for details).");
static final AllowableValue EXTERNAL_TABLE = new AllowableValue("External", "External",
"Any tables created by this processor will be external tables located at the `External Table Location` property value.");
static final AllowableValue ATTRIBUTE_DRIVEN_TABLE = new AllowableValue("Use '" + TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE + "' Attribute",
"Use '" + TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE + "' Attribute",
"Inspects the '" + TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE + "' FlowFile attribute to determine the table management strategy. The value "
+ "of this attribute must be a case-insensitive match to one of the other allowable values (Managed, External, e.g.).");
static final String ATTR_OUTPUT_TABLE = "output.table";
static final String ATTR_OUTPUT_PATH = "output.path";
@ -134,6 +150,29 @@ public class UpdateHive3Table extends AbstractProcessor {
.defaultValue(FAIL_IF_NOT_EXISTS.getValue())
.build();
static final PropertyDescriptor TABLE_MANAGEMENT_STRATEGY = new PropertyDescriptor.Builder()
.name("hive3-create-table-management")
.displayName("Create Table Management Strategy")
.description("Specifies (when a table is to be created) whether the table is a managed table or an external table. Note that when External is specified, the "
+ "'External Table Location' property must be specified. If the '" + TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE + "' value is selected, 'External Table Location' "
+ "must still be specified, but can contain Expression Language or be set to the empty string, and is ignored when the attribute evaluates to 'Managed'.")
.required(true)
.addValidator(Validator.VALID)
.allowableValues(MANAGED_TABLE, EXTERNAL_TABLE, ATTRIBUTE_DRIVEN_TABLE)
.defaultValue(MANAGED_TABLE.getValue())
.dependsOn(CREATE_TABLE, CREATE_IF_NOT_EXISTS)
.build();
static final PropertyDescriptor EXTERNAL_TABLE_LOCATION = new PropertyDescriptor.Builder()
.name("hive3-external-table-location")
.displayName("External Table Location")
.description("Specifies (when an external table is to be created) the file path (in HDFS, e.g.) to store table data.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.dependsOn(TABLE_MANAGEMENT_STRATEGY, EXTERNAL_TABLE, ATTRIBUTE_DRIVEN_TABLE)
.build();
static final PropertyDescriptor TABLE_STORAGE_FORMAT = new PropertyDescriptor.Builder()
.name("hive3-storage-format")
.displayName("Create Table Storage Format")
@ -147,7 +186,7 @@ public class UpdateHive3Table extends AbstractProcessor {
static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
.name("hive3-query-timeout")
.displayName("Query timeout")
.displayName("Query Timeout")
.description("Sets the number of seconds the driver will wait for a query to execute. "
+ "A value of 0 means no timeout. NOTE: Non-zero values may not be supported by the driver.")
.defaultValue("0")
@ -156,15 +195,18 @@ public class UpdateHive3Table extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor STATIC_PARTITION_VALUES = new PropertyDescriptor.Builder()
.name("hive3-part-vals")
.displayName("Static Partition Values")
.description("Specifies a comma-separated list of the values for the partition columns of the target table. This assumes all incoming records belong to the same partition "
+ "and the partition columns are not fields in the record. If specified, this property will often contain "
+ "Expression Language. For example if PartitionRecord is upstream and two partition columns 'name' and 'age' are used, then this property can be set to "
+ "${name},${age}. This property must be set if the table is partitioned, and must not be set if the table is not partitioned. If this property is set, the values "
+ "will be used as the partition values, and the partition.location value will reflect the location of the partition in the filesystem (for use downstream in "
+ "processors like PutHDFS).")
static final PropertyDescriptor PARTITION_CLAUSE = new PropertyDescriptor.Builder()
.name("hive3-partition-clause")
.displayName("Partition Clause")
.description("Specifies a comma-separated list of attribute names and optional data types corresponding to the partition columns of the target table. Simply put, if the table is "
+ "partitioned or is to be created with partitions, each partition name should be an attribute on the FlowFile and listed in this property. This assumes all incoming records "
+ "belong to the same partition and the partition columns are not fields in the record. An example of specifying this field is if PartitionRecord "
+ "is upstream and two partition columns 'name' (of type string) and 'age' (of type integer) are used, then this property can be set to 'name string, age int'. The data types "
+ "are optional and if partition(s) are to be created they will default to string type if not specified. For non-string primitive types, specifying the data type for existing "
+ "partition columns is helpful for interpreting the partition value(s). If the table exists, the data types need not be specified "
+ "(and are ignored in that case). This property must be set if the table is partitioned, and there must be an attribute for each partition column in the table. "
+ "The values of the attributes will be used as the partition values, and the resulting output.path attribute value will reflect the location of the partition in the filesystem "
+ "(for use downstream in processors such as PutHDFS).")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -190,8 +232,10 @@ public class UpdateHive3Table extends AbstractProcessor {
props.add(RECORD_READER);
props.add(HIVE_DBCP_SERVICE);
props.add(TABLE_NAME);
props.add(STATIC_PARTITION_VALUES);
props.add(PARTITION_CLAUSE);
props.add(CREATE_TABLE);
props.add(TABLE_MANAGEMENT_STRATEGY);
props.add(EXTERNAL_TABLE_LOCATION);
props.add(TABLE_STORAGE_FORMAT);
props.add(QUERY_TIMEOUT);
@ -223,10 +267,10 @@ public class UpdateHive3Table extends AbstractProcessor {
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String staticPartitionValuesString = context.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
List<String> staticPartitionValues = null;
if (!StringUtils.isEmpty(staticPartitionValuesString)) {
staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
final String partitionClauseString = context.getProperty(PARTITION_CLAUSE).evaluateAttributeExpressions(flowFile).getValue();
List<String> partitionClauseElements = null;
if (!StringUtils.isEmpty(partitionClauseString)) {
partitionClauseElements = Arrays.stream(partitionClauseString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
}
final ComponentLog log = getLogger();
@ -255,25 +299,49 @@ public class UpdateHive3Table extends AbstractProcessor {
RecordSchema recordSchema = reader.getSchema();
final boolean createIfNotExists = context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
final String tableManagementStrategy = context.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue();
final boolean managedTable;
if (ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) {
String tableManagementStrategyAttribute = flowFile.getAttribute(TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE);
if (MANAGED_TABLE.getValue().equalsIgnoreCase(tableManagementStrategyAttribute)) {
managedTable = true;
} else if (EXTERNAL_TABLE.getValue().equalsIgnoreCase(tableManagementStrategyAttribute)) {
managedTable = false;
} else {
log.error("The '{}' attribute either does not exist or has invalid value: {}. Must be one of (ignoring case): Managed, External. "
+ "Routing flowfile to failure",
new Object[]{TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE, tableManagementStrategyAttribute});
session.transfer(flowFile, REL_FAILURE);
return;
}
} else {
managedTable = MANAGED_TABLE.getValue().equals(tableManagementStrategy);
}
// Ensure valid configuration for external tables
if (createIfNotExists && !managedTable && !context.getProperty(EXTERNAL_TABLE_LOCATION).isSet()) {
throw new IOException("External Table Location must be set when Table Management Strategy is set to External");
}
final String externalTableLocation = managedTable ? null : context.getProperty(EXTERNAL_TABLE_LOCATION).evaluateAttributeExpressions(flowFile).getValue();
if (!managedTable && StringUtils.isEmpty(externalTableLocation)) {
log.error("External Table Location has invalid value: {}. Routing flowfile to failure", new Object[]{externalTableLocation});
session.transfer(flowFile, REL_FAILURE);
return;
}
final String storageFormat = context.getProperty(TABLE_STORAGE_FORMAT).getValue();
final Hive3DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive3DBCPService.class);
try (final Connection connection = dbcpService.getConnection()) {
checkAndUpdateTableSchema(session, flowFile, connection, recordSchema, tableName, staticPartitionValues, createIfNotExists, storageFormat);
checkAndUpdateTableSchema(session, flowFile, connection, recordSchema, tableName, partitionClauseElements, createIfNotExists, externalTableLocation, storageFormat);
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, dbcpService.getConnectionURL());
session.transfer(flowFile, REL_SUCCESS);
}
} catch (IOException | SQLException e) {
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
log.error(
"Exception while processing {} - routing to failure",
new Object[]{flowFile},
e
);
log.error("Exception while processing {} - routing to failure", new Object[]{flowFile}, e);
session.transfer(flowFile, REL_FAILURE);
} catch (DiscontinuedException e) {
// The input FlowFile processing is discontinued. Keep it in the input queue.
getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
@ -284,8 +352,8 @@ public class UpdateHive3Table extends AbstractProcessor {
}
private synchronized void checkAndUpdateTableSchema(final ProcessSession session, final FlowFile flowFile, final Connection conn, final RecordSchema schema,
final String tableName, final List<String> partitionValues,
final boolean createIfNotExists, final String storageFormat) throws IOException {
final String tableName, List<String> partitionClause, final boolean createIfNotExists,
final String externalTableLocation, final String storageFormat) throws IOException {
// Read in the current table metadata, compare it to the reader's schema, and
// add any columns from the schema that are missing in the table
try (Statement s = conn.createStatement()) {
@ -299,20 +367,41 @@ public class UpdateHive3Table extends AbstractProcessor {
List<String> columnsToAdd = new ArrayList<>();
String outputPath;
boolean tableCreated = false;
if (!tableNames.contains(tableName) && createIfNotExists) {
StringBuilder createTableStatement = new StringBuilder();
for (RecordField recordField : schema.getFields()) {
String recordFieldName = recordField.getFieldName();
// The field does not exist in the table, add it
columnsToAdd.add(recordFieldName + " " + NiFiOrcUtils.getHiveTypeFromFieldType(recordField.getDataType(), true));
columnsToAdd.add("`" + recordFieldName + "` " + NiFiOrcUtils.getHiveTypeFromFieldType(recordField.getDataType(), true));
getLogger().debug("Adding column " + recordFieldName + " to table " + tableName);
}
createTableStatement.append("CREATE TABLE IF NOT EXISTS ")
// Handle partition clause
if (partitionClause == null) {
partitionClause = Collections.emptyList();
}
List<String> validatedPartitionClause = new ArrayList<>(partitionClause.size());
for (String partition : partitionClause) {
String[] partitionInfo = partition.split(" ");
if (partitionInfo.length != 2) {
validatedPartitionClause.add("`" + partitionInfo[0] + "` string");
} else {
validatedPartitionClause.add("`" + partitionInfo[0] + "` " + partitionInfo[1]);
}
}
createTableStatement.append("CREATE ")
.append(externalTableLocation == null ? "" : "EXTERNAL ")
.append("TABLE IF NOT EXISTS `")
.append(tableName)
.append(" (")
.append("` (")
.append(String.join(", ", columnsToAdd))
.append(") STORED AS ")
.append(storageFormat);
.append(") ")
.append(validatedPartitionClause.isEmpty() ? "" : "PARTITIONED BY (" + String.join(", ", validatedPartitionClause) + ") ")
.append("STORED AS ")
.append(storageFormat)
.append(externalTableLocation == null ? "" : " LOCATION '" + externalTableLocation + "'");
String createTableSql = createTableStatement.toString();
@ -322,29 +411,55 @@ public class UpdateHive3Table extends AbstractProcessor {
s.execute(createTableSql);
}
// Now that the table is created, describe it and determine its location (for placing the flowfile downstream)
String describeTable = "DESC FORMATTED " + tableName;
ResultSet tableInfo = s.executeQuery(describeTable);
boolean moreRows = tableInfo.next();
boolean locationFound = false;
while (moreRows && !locationFound) {
String line = tableInfo.getString(1);
if (line.startsWith("Location:")) {
locationFound = true;
continue; // Don't do a next() here, need to get the second column value
}
moreRows = tableInfo.next();
}
outputPath = tableInfo.getString(2);
tableCreated = true;
}
} else {
List<String> hiveColumns = new ArrayList<>();
// Process the table (columns, partitions, location, etc.)
List<String> hiveColumns = new ArrayList<>();
String describeTable = "DESC FORMATTED " + tableName;
ResultSet tableInfo = s.executeQuery(describeTable);
// Result is 3 columns, col_name, data_type, comment. Check the first row for a header and skip if so, otherwise add column name
String describeTable = "DESC FORMATTED `" + tableName + "`";
ResultSet tableInfo = s.executeQuery(describeTable);
// Result is 3 columns, col_name, data_type, comment. Check the first row for a header and skip if so, otherwise add column name
tableInfo.next();
String columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
hiveColumns.add(columnName);
}
// If the column was a header, check for a blank line to follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) {
tableInfo.next();
String columnName = tableInfo.getString(1);
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName)) {
hiveColumns.add(columnName);
}
}
// Collect all column names
while (tableInfo.next() && StringUtils.isNotEmpty(columnName = tableInfo.getString(1))) {
hiveColumns.add(columnName);
}
// Collect all partition columns
boolean moreRows = true;
boolean headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if ("# Partition Information".equals(line)) {
headerFound = true;
} else if ("# Detailed Table Information".equals(line)) {
// Not partitioned, exit the loop with headerFound = false
break;
}
moreRows = tableInfo.next();
}
List<String> partitionColumns = new ArrayList<>();
List<String> partitionColumnsEqualsValueList = new ArrayList<>();
List<String> partitionColumnsLocationList = new ArrayList<>();
if (headerFound) {
// If the table is partitioned, construct the partition=value strings for each partition column
String partitionColumnName;
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
hiveColumns.add(columnName);
}
@ -353,97 +468,65 @@ public class UpdateHive3Table extends AbstractProcessor {
tableInfo.next();
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName)) {
hiveColumns.add(columnName);
partitionColumns.add(columnName);
}
}
// Collect all column names
while (tableInfo.next() && StringUtils.isNotEmpty(columnName = tableInfo.getString(1))) {
hiveColumns.add(columnName);
while (tableInfo.next() && StringUtils.isNotEmpty(partitionColumnName = tableInfo.getString(1))) {
partitionColumns.add(partitionColumnName);
}
// Collect all partition columns
boolean moreRows = true;
boolean headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if ("# Partition Information".equals(line)) {
headerFound = true;
} else if ("# Detailed Table Information".equals(line)) {
// Not partitioned, exit the loop with headerFound = false
break;
}
moreRows = tableInfo.next();
final int partitionColumnsSize = partitionColumns.size();
final int partitionClauseSize = (partitionClause == null) ? 0 : partitionClause.size();
if (partitionClauseSize != partitionColumnsSize) {
throw new IOException("Found " + partitionColumnsSize + " partition columns but " + partitionClauseSize + " partition values were supplied");
}
List<String> partitionColumns = new ArrayList<>();
List<String> partitionColumnsEqualsValueList = new ArrayList<>();
List<String> partitionColumnsLocationList = new ArrayList<>();
if (headerFound) {
// If the table is partitioned, construct the partition=value strings for each partition column
String partitionColumnName;
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
hiveColumns.add(columnName);
for (int i = 0; i < partitionClauseSize; i++) {
String partitionName = partitionClause.get(i).split(" ")[0];
String partitionValue = flowFile.getAttribute(partitionName);
if (StringUtils.isEmpty(partitionValue)) {
throw new IOException("No value found for partition value attribute '" + partitionName + "'");
}
// If the column was a header, check for a blank line to follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) {
tableInfo.next();
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName)) {
partitionColumns.add(columnName);
}
}
while (tableInfo.next() && StringUtils.isNotEmpty(partitionColumnName = tableInfo.getString(1))) {
partitionColumns.add(partitionColumnName);
}
final int partitionColumnsSize = partitionColumns.size();
if (partitionValues == null) {
throw new IOException("Found " + partitionColumnsSize + " partition columns but no Static Partition Values were supplied");
}
final int partitionValuesSize = partitionValues.size();
if (partitionValuesSize < partitionColumnsSize) {
throw new IOException("Found " + partitionColumnsSize + " partition columns but only " + partitionValuesSize + " Static Partition Values were supplied");
}
for (int i = 0; i < partitionColumns.size(); i++) {
partitionColumnsEqualsValueList.add(partitionColumns.get(i) + "='" + partitionValues.get(i) + "'");
// Add unquoted version for the output path
partitionColumnsLocationList.add(partitionColumns.get(i) + "=" + partitionValues.get(i));
if (!partitionColumns.contains(partitionName)) {
throw new IOException("Cannot add partition '" + partitionName + "' to existing table");
}
partitionColumnsEqualsValueList.add("`" + partitionName + "`='" + partitionValue + "'");
// Add unquoted version for the output path
partitionColumnsLocationList.add(partitionName + "=" + partitionValue);
}
}
// Get table location
moreRows = true;
headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if (line.startsWith("Location:")) {
headerFound = true;
continue; // Don't do a next() here, need to get the second column value
}
moreRows = tableInfo.next();
// Get table location
moreRows = true;
headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if (line.startsWith("Location:")) {
headerFound = true;
continue; // Don't do a next() here, need to get the second column value
}
String tableLocation = tableInfo.getString(2);
moreRows = tableInfo.next();
}
String tableLocation = tableInfo.getString(2);
String alterTableSql;
// If the table wasn't newly created, alter it accordingly
if (!tableCreated) {
StringBuilder alterTableStatement = new StringBuilder();
// Handle new columns
for (RecordField recordField : schema.getFields()) {
String recordFieldName = recordField.getFieldName().toLowerCase();
if (!hiveColumns.contains(recordFieldName) && !partitionColumns.contains(recordFieldName)) {
// The field does not exist in the table (and is not a partition column), add it
columnsToAdd.add(recordFieldName + " " + NiFiOrcUtils.getHiveTypeFromFieldType(recordField.getDataType(), true));
columnsToAdd.add("`" + recordFieldName + "` " + NiFiOrcUtils.getHiveTypeFromFieldType(recordField.getDataType(), true));
getLogger().info("Adding column " + recordFieldName + " to table " + tableName);
}
}
String alterTableSql;
if (!columnsToAdd.isEmpty()) {
alterTableStatement.append("ALTER TABLE ")
alterTableStatement.append("ALTER TABLE `")
.append(tableName)
.append(" ADD COLUMNS (")
.append("` ADD COLUMNS (")
.append(String.join(", ", columnsToAdd))
.append(")");
@ -454,24 +537,24 @@ public class UpdateHive3Table extends AbstractProcessor {
s.execute(alterTableSql);
}
}
}
outputPath = tableLocation;
outputPath = tableLocation;
// Handle new partitions
if (!partitionColumnsEqualsValueList.isEmpty()) {
alterTableSql = "ALTER TABLE " +
tableName +
" ADD IF NOT EXISTS PARTITION (" +
String.join(", ", partitionColumnsEqualsValueList) +
")";
if (StringUtils.isNotEmpty(alterTableSql)) {
// Perform the table update
getLogger().info("Executing Hive DDL: " + alterTableSql);
s.execute(alterTableSql);
}
// Add attribute for HDFS location of the partition values
outputPath = tableLocation + "/" + String.join("/", partitionColumnsLocationList);
// Handle new partition values
if (!partitionColumnsEqualsValueList.isEmpty()) {
alterTableSql = "ALTER TABLE `" +
tableName +
"` ADD IF NOT EXISTS PARTITION (" +
String.join(", ", partitionColumnsEqualsValueList) +
")";
if (StringUtils.isNotEmpty(alterTableSql)) {
// Perform the table update
getLogger().info("Executing Hive DDL: " + alterTableSql);
s.execute(alterTableSql);
}
// Add attribute for HDFS location of the partition values
outputPath = tableLocation + "/" + String.join("/", partitionColumnsLocationList);
}
session.putAttribute(flowFile, ATTR_OUTPUT_PATH, outputPath);

View File

@ -100,6 +100,15 @@ public class TestUpdateHive3Table {
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/users", null}
};
private static final String[][] DESC_EXTERNAL_USERS_TABLE_RESULTSET = new String[][]{
new String[]{"name", "string", ""},
new String[]{"favorite_number", "int", ""},
new String[]{"favorite_color", "string", ""},
new String[]{"scale", "double", ""},
new String[]{"", null, null},
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/path/to/users", null}
};
private static final String[] DESC_NEW_TABLE_COLUMN_NAMES = DESC_USERS_TABLE_COLUMN_NAMES;
private static final String[][] DESC_NEW_TABLE_RESULTSET = new String[][]{
@ -110,7 +119,7 @@ public class TestUpdateHive3Table {
new String[]{"scale", "double", ""},
new String[]{"", null, null},
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/newTable", null}
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/_newTable", null}
};
@Rule
@ -187,12 +196,11 @@ public class TestUpdateHive3Table {
runner.assertNotValid();
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
final DBCPService service = new MockHiveConnectionPool(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
runner.assertNotValid();
runner.assertNotValid();
runner.setProperty(UpdateHive3Table.TABLE_NAME, "users");
runner.assertValid();
runner.run();
@ -203,12 +211,15 @@ public class TestUpdateHive3Table {
public void testNoStatementsExecuted() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "users");
final MockDBCPService service = new MockDBCPService("test");
final MockHiveConnectionPool service = new MockHiveConnectionPool("test");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHive3Table.STATIC_PARTITION_VALUES, "Asia,China");
runner.enqueue(new byte[0]);
runner.setProperty(UpdateHive3Table.PARTITION_CLAUSE, "continent, country");
HashMap<String,String> attrs = new HashMap<>();
attrs.put("continent", "Asia");
attrs.put("country", "China");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
@ -219,28 +230,87 @@ public class TestUpdateHive3Table {
}
@Test
public void testCreateTable() throws Exception {
public void testCreateManagedTable() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "${table.name}");
runner.setProperty(UpdateHive3Table.CREATE_TABLE, UpdateHive3Table.CREATE_IF_NOT_EXISTS);
runner.setProperty(UpdateHive3Table.TABLE_STORAGE_FORMAT, UpdateHive3Table.PARQUET);
final MockDBCPService service = new MockDBCPService("newTable");
final MockHiveConnectionPool service = new MockHiveConnectionPool("_newTable");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "newTable");
attrs.put("table.name", "_newTable");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive3Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE, "newTable");
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/newTable");
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE, "_newTable");
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/_newTable");
List<String> statements = service.getExecutedStatements();
assertEquals(1, statements.size());
assertEquals("CREATE TABLE IF NOT EXISTS newTable (name STRING, favorite_number INT, favorite_color STRING, scale DOUBLE) STORED AS PARQUET",
assertEquals("CREATE TABLE IF NOT EXISTS `_newTable` (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) STORED AS PARQUET",
statements.get(0));
}
@Test
public void testCreateManagedTableWithPartition() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "${table.name}");
runner.setProperty(UpdateHive3Table.CREATE_TABLE, UpdateHive3Table.CREATE_IF_NOT_EXISTS);
runner.setProperty(UpdateHive3Table.PARTITION_CLAUSE, "age int");
runner.setProperty(UpdateHive3Table.TABLE_STORAGE_FORMAT, UpdateHive3Table.PARQUET);
final MockHiveConnectionPool service = new MockHiveConnectionPool("_newTable");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "_newTable");
attrs.put("age", "23");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive3Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE, "_newTable");
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/_newTable");
List<String> statements = service.getExecutedStatements();
assertEquals(1, statements.size());
assertEquals("CREATE TABLE IF NOT EXISTS `_newTable` (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) PARTITIONED BY (`age` int) STORED AS PARQUET",
statements.get(0));
}
@Test
public void testCreateExternalTable() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "${table.name}");
runner.setProperty(UpdateHive3Table.CREATE_TABLE, UpdateHive3Table.CREATE_IF_NOT_EXISTS);
runner.setProperty(UpdateHive3Table.TABLE_MANAGEMENT_STRATEGY, UpdateHive3Table.EXTERNAL_TABLE);
runner.setProperty(UpdateHive3Table.TABLE_STORAGE_FORMAT, UpdateHive3Table.PARQUET);
final MockHiveConnectionPool service = new MockHiveConnectionPool("ext_users");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
runner.assertNotValid(); // Needs location specified
runner.setProperty(UpdateHive3Table.EXTERNAL_TABLE_LOCATION, "/path/to/users");
runner.assertValid();
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "ext_users");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive3Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_TABLE, "ext_users");
flowFile.assertAttributeEquals(UpdateHive3Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/path/to/users");
List<String> statements = service.getExecutedStatements();
assertEquals(1, statements.size());
assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS `ext_users` (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) STORED AS PARQUET "
+ "LOCATION '/path/to/users'",
statements.get(0));
}
@ -248,12 +318,15 @@ public class TestUpdateHive3Table {
public void testAddColumnsAndPartition() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "messages");
final MockDBCPService service = new MockDBCPService("test");
final MockHiveConnectionPool service = new MockHiveConnectionPool("test");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHive3Table.STATIC_PARTITION_VALUES, "Asia,China");
runner.enqueue(new byte[0]);
runner.setProperty(UpdateHive3Table.PARTITION_CLAUSE, "continent, country");
HashMap<String,String> attrs = new HashMap<>();
attrs.put("continent", "Asia");
attrs.put("country", "China");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 1);
@ -263,9 +336,9 @@ public class TestUpdateHive3Table {
List<String> statements = service.getExecutedStatements();
assertEquals(2, statements.size());
// All columns from users table/data should be added to the table, and a new partition should be added
assertEquals("ALTER TABLE messages ADD COLUMNS (name STRING, favorite_number INT, favorite_color STRING, scale DOUBLE)",
assertEquals("ALTER TABLE `messages` ADD COLUMNS (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE)",
statements.get(0));
assertEquals("ALTER TABLE messages ADD IF NOT EXISTS PARTITION (continent='Asia', country='China')",
assertEquals("ALTER TABLE `messages` ADD IF NOT EXISTS PARTITION (`continent`='Asia', `country`='China')",
statements.get(1));
}
@ -273,7 +346,7 @@ public class TestUpdateHive3Table {
public void testMissingPartitionValues() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "messages");
final DBCPService service = new MockDBCPService("test");
final DBCPService service = new MockHiveConnectionPool("test");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
@ -284,18 +357,56 @@ public class TestUpdateHive3Table {
runner.assertTransferCount(UpdateHive3Table.REL_FAILURE, 1);
}
@Test
public void testCannotAddPartition() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "messages");
final MockHiveConnectionPool service = new MockHiveConnectionPool("test");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHive3Table.PARTITION_CLAUSE, "continent, country, extra"); // "extra" partition doesn't exist on the table
HashMap<String,String> attrs = new HashMap<>();
attrs.put("continent", "Asia");
attrs.put("country", "China");
attrs.put("extra", "extra");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 0);
runner.assertTransferCount(UpdateHive3Table.REL_FAILURE, 1);
}
@Test
public void testMissingAttributeForPartition() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive3Table.TABLE_NAME, "messages");
final MockHiveConnectionPool service = new MockHiveConnectionPool("test");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive3Table.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHive3Table.PARTITION_CLAUSE, "continent, country");
HashMap<String,String> attrs = new HashMap<>();
attrs.put("continent", "Asia");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHive3Table.REL_SUCCESS, 0);
runner.assertTransferCount(UpdateHive3Table.REL_FAILURE, 1);
}
private static final class MockUpdateHive3Table extends UpdateHive3Table {
}
/**
* Simple implementation only for testing purposes
*/
private static class MockDBCPService extends AbstractControllerService implements Hive3DBCPService {
private static class MockHiveConnectionPool extends AbstractControllerService implements Hive3DBCPService {
private final String dbLocation;
private final List<String> executedStatements = new ArrayList<>();
MockDBCPService(final String dbLocation) {
MockHiveConnectionPool(final String dbLocation) {
this.dbLocation = dbLocation;
}
@ -314,11 +425,13 @@ public class TestUpdateHive3Table {
final String query = invocation.getArgument(0);
if ("SHOW TABLES".equals(query)) {
return new MockResultSet(SHOW_TABLES_COLUMN_NAMES, SHOW_TABLES_RESULTSET).createResultSet();
} else if ("DESC FORMATTED messages".equals(query)) {
} else if ("DESC FORMATTED `messages`".equals(query)) {
return new MockResultSet(DESC_MESSAGES_TABLE_COLUMN_NAMES, DESC_MESSAGES_TABLE_RESULTSET).createResultSet();
} else if ("DESC FORMATTED users".equals(query)) {
} else if ("DESC FORMATTED `users`".equals(query)) {
return new MockResultSet(DESC_USERS_TABLE_COLUMN_NAMES, DESC_USERS_TABLE_RESULTSET).createResultSet();
} else if ("DESC FORMATTED newTable".equals(query)) {
} else if ("DESC FORMATTED `ext_users`".equals(query)) {
return new MockResultSet(DESC_USERS_TABLE_COLUMN_NAMES, DESC_EXTERNAL_USERS_TABLE_RESULTSET).createResultSet();
} else if ("DESC FORMATTED `_newTable`".equals(query)) {
return new MockResultSet(DESC_NEW_TABLE_COLUMN_NAMES, DESC_NEW_TABLE_RESULTSET).createResultSet();
} else {
return new MockResultSet(new String[]{}, new String[][]{new String[]{}}).createResultSet();

View File

@ -17,6 +17,8 @@
package org.apache.nifi.processors.hive;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@ -68,6 +70,10 @@ import java.util.stream.Collectors;
@Tags({"hive", "metadata", "jdbc", "database", "table"})
@CapabilityDescription("This processor uses a Hive JDBC connection and incoming records to generate any Hive 1.1 table changes needed to support the incoming records.")
@ReadsAttributes({
@ReadsAttribute(attribute = "hive.table.management.strategy", description = "This attribute is read if the 'Table Management Strategy' property is configured "
+ "to use the value of this attribute. The value of this attribute should correspond (ignoring case) to a valid option of the 'Table Management Strategy' property.")
})
@WritesAttributes({
@WritesAttribute(attribute = "output.table", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the target table name."),
@ -99,6 +105,16 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
static final AllowableValue FAIL_IF_NOT_EXISTS = new AllowableValue("Fail If Not Exists", "Fail If Not Exists",
"If the target does not already exist, log an error and route the flowfile to failure");
static final String TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE = "hive.table.management.strategy";
static final AllowableValue MANAGED_TABLE = new AllowableValue("Managed", "Managed",
"Any tables created by this processor will be managed tables (see Hive documentation for details).");
static final AllowableValue EXTERNAL_TABLE = new AllowableValue("External", "External",
"Any tables created by this processor will be external tables located at the `External Table Location` property value.");
static final AllowableValue ATTRIBUTE_DRIVEN_TABLE = new AllowableValue("Use '" + TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE + "' Attribute",
"Use '" + TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE + "' Attribute",
"Inspects the '" + TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE + "' FlowFile attribute to determine the table management strategy. The value "
+ "of this attribute must be a case-insensitive match to one of the other allowable values (Managed, External, e.g.).");
static final String ATTR_OUTPUT_TABLE = "output.table";
static final String ATTR_OUTPUT_PATH = "output.path";
@ -130,7 +146,7 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
.build();
static final PropertyDescriptor CREATE_TABLE = new PropertyDescriptor.Builder()
.name("hive3-create-table")
.name("hive11-create-table")
.displayName("Create Table Strategy")
.description("Specifies how to process the target table when it does not exist (create it, fail, e.g.).")
.required(true)
@ -139,8 +155,31 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
.defaultValue(FAIL_IF_NOT_EXISTS.getValue())
.build();
static final PropertyDescriptor TABLE_MANAGEMENT_STRATEGY = new PropertyDescriptor.Builder()
.name("hive11-create-table-management")
.displayName("Create Table Management Strategy")
.description("Specifies (when a table is to be created) whether the table is a managed table or an external table. Note that when External is specified, the "
+ "'External Table Location' property must be specified. If the '" + TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE + "' value is selected, 'External Table Location' "
+ "must still be specified, but can contain Expression Language or be set to the empty string, and is ignored when the attribute evaluates to 'Managed'.")
.required(true)
.addValidator(Validator.VALID)
.allowableValues(MANAGED_TABLE, EXTERNAL_TABLE, ATTRIBUTE_DRIVEN_TABLE)
.defaultValue(MANAGED_TABLE.getValue())
.dependsOn(CREATE_TABLE, CREATE_IF_NOT_EXISTS)
.build();
static final PropertyDescriptor EXTERNAL_TABLE_LOCATION = new PropertyDescriptor.Builder()
.name("hive11-external-table-location")
.displayName("External Table Location")
.description("Specifies (when an external table is to be created) the file path (in HDFS, e.g.) to store table data.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.dependsOn(TABLE_MANAGEMENT_STRATEGY, EXTERNAL_TABLE, ATTRIBUTE_DRIVEN_TABLE)
.build();
static final PropertyDescriptor TABLE_STORAGE_FORMAT = new PropertyDescriptor.Builder()
.name("hive3-storage-format")
.name("hive11-storage-format")
.displayName("Create Table Storage Format")
.description("If a table is to be created, the specified storage format will be used.")
.required(true)
@ -151,8 +190,8 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
.build();
static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder()
.name("hive11-query-timeout")
.displayName("Query timeout")
.name("hive11query-timeout")
.displayName("Query Timeout")
.description("Sets the number of seconds the driver will wait for a query to execute. "
+ "A value of 0 means no timeout. NOTE: Non-zero values may not be supported by the driver.")
.defaultValue("0")
@ -161,15 +200,18 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor STATIC_PARTITION_VALUES = new PropertyDescriptor.Builder()
.name("hive11-part-vals")
.displayName("Static Partition Values")
.description("Specifies a comma-separated list of the values for the partition columns of the target table. This assumes all incoming records belong to the same partition "
+ "and the partition columns are not fields in the record. If specified, this property will often contain "
+ "Expression Language. For example if PartitionRecord is upstream and two partition columns 'name' and 'age' are used, then this property can be set to "
+ "${name},${age}. This property must be set if the table is partitioned, and must not be set if the table is not partitioned. If this property is set, the values "
+ "will be used as the partition values, and the partition.location value will reflect the location of the partition in the filesystem (for use downstream in "
+ "processors like PutHDFS).")
static final PropertyDescriptor PARTITION_CLAUSE = new PropertyDescriptor.Builder()
.name("hive11-partition-clause")
.displayName("Partition Clause")
.description("Specifies a comma-separated list of attribute names and optional data types corresponding to the partition columns of the target table. Simply put, if the table is "
+ "partitioned or is to be created with partitions, each partition name should be an attribute on the FlowFile and listed in this property. This assumes all incoming records "
+ "belong to the same partition and the partition columns are not fields in the record. An example of specifying this field is if PartitionRecord "
+ "is upstream and two partition columns 'name' (of type string) and 'age' (of type integer) are used, then this property can be set to 'name string, age int'. The data types "
+ "are optional and if partition(s) are to be created they will default to string type if not specified. For non-string primitive types, specifying the data type for existing "
+ "partition columns is helpful for interpreting the partition value(s). If the table exists, the data types need not be specified "
+ "(and are ignored in that case). This property must be set if the table is partitioned, and there must be an attribute for each partition column in the table. "
+ "The values of the attributes will be used as the partition values, and the resulting output.path attribute value will reflect the location of the partition in the filesystem "
+ "(for use downstream in processors such as PutHDFS).")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -195,8 +237,10 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
props.add(RECORD_READER);
props.add(HIVE_DBCP_SERVICE);
props.add(TABLE_NAME);
props.add(STATIC_PARTITION_VALUES);
props.add(PARTITION_CLAUSE);
props.add(CREATE_TABLE);
props.add(TABLE_MANAGEMENT_STRATEGY);
props.add(EXTERNAL_TABLE_LOCATION);
props.add(TABLE_STORAGE_FORMAT);
props.add(QUERY_TIMEOUT);
@ -228,10 +272,10 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String staticPartitionValuesString = context.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
List<String> staticPartitionValues = null;
if (!StringUtils.isEmpty(staticPartitionValuesString)) {
staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
final String partitionClauseString = context.getProperty(PARTITION_CLAUSE).evaluateAttributeExpressions(flowFile).getValue();
List<String> partitionClauseElements = null;
if (!StringUtils.isEmpty(partitionClauseString)) {
partitionClauseElements = Arrays.stream(partitionClauseString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
}
final ComponentLog log = getLogger();
@ -260,11 +304,39 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
RecordSchema recordSchema = reader.getSchema();
final boolean createIfNotExists = context.getProperty(CREATE_TABLE).getValue().equals(CREATE_IF_NOT_EXISTS.getValue());
final String tableManagementStrategy = context.getProperty(TABLE_MANAGEMENT_STRATEGY).getValue();
final boolean managedTable;
if (ATTRIBUTE_DRIVEN_TABLE.getValue().equals(tableManagementStrategy)) {
String tableManagementStrategyAttribute = flowFile.getAttribute(TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE);
if (MANAGED_TABLE.getValue().equalsIgnoreCase(tableManagementStrategyAttribute)) {
managedTable = true;
} else if (EXTERNAL_TABLE.getValue().equalsIgnoreCase(tableManagementStrategyAttribute)) {
managedTable = false;
} else {
log.error("The '{}' attribute either does not exist or has invalid value: {}. Must be one of (ignoring case): Managed, External. "
+ "Routing flowfile to failure",
new Object[]{TABLE_MANAGEMENT_STRATEGY_ATTRIBUTE, tableManagementStrategyAttribute});
session.transfer(flowFile, REL_FAILURE);
return;
}
} else {
managedTable = MANAGED_TABLE.getValue().equals(tableManagementStrategy);
}
// Ensure valid configuration for external tables
if (createIfNotExists && !managedTable && !context.getProperty(EXTERNAL_TABLE_LOCATION).isSet()) {
throw new IOException("External Table Location must be set when Table Management Strategy is set to External");
}
final String externalTableLocation = managedTable ? null : context.getProperty(EXTERNAL_TABLE_LOCATION).evaluateAttributeExpressions(flowFile).getValue();
if (!managedTable && StringUtils.isEmpty(externalTableLocation)) {
log.error("External Table Location has invalid value: {}. Routing flowfile to failure", new Object[]{externalTableLocation});
session.transfer(flowFile, REL_FAILURE);
return;
}
final String storageFormat = context.getProperty(TABLE_STORAGE_FORMAT).getValue();
final Hive_1_1DBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(Hive_1_1DBCPService.class);
try (final Connection connection = dbcpService.getConnection()) {
checkAndUpdateTableSchema(session, flowFile, connection, recordSchema, tableName, staticPartitionValues, createIfNotExists, storageFormat);
checkAndUpdateTableSchema(session, flowFile, connection, recordSchema, tableName, partitionClauseElements, createIfNotExists, externalTableLocation, storageFormat);
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, dbcpService.getConnectionURL());
session.transfer(flowFile, REL_SUCCESS);
@ -272,11 +344,7 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
} catch (IOException | SQLException e) {
flowFile = session.putAttribute(flowFile, ATTR_OUTPUT_TABLE, tableName);
log.error(
"Exception while processing {} - routing to failure",
new Object[]{flowFile},
e
);
log.error("Exception while processing {} - routing to failure", new Object[]{flowFile}, e);
session.transfer(flowFile, REL_FAILURE);
} catch (DiscontinuedException e) {
@ -289,8 +357,8 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
}
private synchronized void checkAndUpdateTableSchema(final ProcessSession session, final FlowFile flowFile, final Connection conn, final RecordSchema schema,
final String tableName, final List<String> partitionValues,
final boolean createIfNotExists, final String storageFormat) throws IOException {
final String tableName, List<String> partitionClause, final boolean createIfNotExists,
final String externalTableLocation, final String storageFormat) throws IOException {
// Read in the current table metadata, compare it to the reader's schema, and
// add any columns from the schema that are missing in the table
try (Statement s = conn.createStatement()) {
@ -304,20 +372,41 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
List<String> columnsToAdd = new ArrayList<>();
String outputPath;
boolean tableCreated = false;
if (!tableNames.contains(tableName) && createIfNotExists) {
StringBuilder createTableStatement = new StringBuilder();
for (RecordField recordField : schema.getFields()) {
String recordFieldName = recordField.getFieldName();
// The field does not exist in the table, add it
columnsToAdd.add(recordFieldName + " " + getHiveTypeFromFieldType(recordField.getDataType(), true));
columnsToAdd.add("`" + recordFieldName + "` " + getHiveTypeFromFieldType(recordField.getDataType(), true));
getLogger().debug("Adding column " + recordFieldName + " to table " + tableName);
}
createTableStatement.append("CREATE TABLE IF NOT EXISTS ")
// Handle partition clause
if (partitionClause == null) {
partitionClause = Collections.emptyList();
}
List<String> validatedPartitionClause = new ArrayList<>(partitionClause.size());
for (String partition : partitionClause) {
String[] partitionInfo = partition.split(" ");
if (partitionInfo.length != 2) {
validatedPartitionClause.add("`" + partitionInfo[0] + "` string");
} else {
validatedPartitionClause.add("`" + partitionInfo[0] + "` " + partitionInfo[1]);
}
}
createTableStatement.append("CREATE ")
.append(externalTableLocation == null ? "" : "EXTERNAL ")
.append("TABLE IF NOT EXISTS `")
.append(tableName)
.append(" (")
.append("` (")
.append(String.join(", ", columnsToAdd))
.append(") STORED AS ")
.append(storageFormat);
.append(") ")
.append(validatedPartitionClause.isEmpty() ? "" : "PARTITIONED BY (" + String.join(", ", validatedPartitionClause) + ") ")
.append("STORED AS ")
.append(storageFormat)
.append(externalTableLocation == null ? "" : " LOCATION '" + externalTableLocation + "'");
String createTableSql = createTableStatement.toString();
@ -327,29 +416,55 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
s.execute(createTableSql);
}
// Now that the table is created, describe it and determine its location (for placing the flowfile downstream)
String describeTable = "DESC FORMATTED " + tableName;
ResultSet tableInfo = s.executeQuery(describeTable);
boolean moreRows = tableInfo.next();
boolean locationFound = false;
while (moreRows && !locationFound) {
String line = tableInfo.getString(1);
if (line.startsWith("Location:")) {
locationFound = true;
continue; // Don't do a next() here, need to get the second column value
}
moreRows = tableInfo.next();
}
outputPath = tableInfo.getString(2);
tableCreated = true;
}
} else {
List<String> hiveColumns = new ArrayList<>();
// Process the table (columns, partitions, location, etc.)
List<String> hiveColumns = new ArrayList<>();
String describeTable = "DESC FORMATTED " + tableName;
ResultSet tableInfo = s.executeQuery(describeTable);
// Result is 3 columns, col_name, data_type, comment. Check the first row for a header and skip if so, otherwise add column name
String describeTable = "DESC FORMATTED `" + tableName + "`";
ResultSet tableInfo = s.executeQuery(describeTable);
// Result is 3 columns, col_name, data_type, comment. Check the first row for a header and skip if so, otherwise add column name
tableInfo.next();
String columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
hiveColumns.add(columnName);
}
// If the column was a header, check for a blank line to follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) {
tableInfo.next();
String columnName = tableInfo.getString(1);
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName)) {
hiveColumns.add(columnName);
}
}
// Collect all column names
while (tableInfo.next() && StringUtils.isNotEmpty(columnName = tableInfo.getString(1))) {
hiveColumns.add(columnName);
}
// Collect all partition columns
boolean moreRows = true;
boolean headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if ("# Partition Information".equals(line)) {
headerFound = true;
} else if ("# Detailed Table Information".equals(line)) {
// Not partitioned, exit the loop with headerFound = false
break;
}
moreRows = tableInfo.next();
}
List<String> partitionColumns = new ArrayList<>();
List<String> partitionColumnsEqualsValueList = new ArrayList<>();
List<String> partitionColumnsLocationList = new ArrayList<>();
if (headerFound) {
// If the table is partitioned, construct the partition=value strings for each partition column
String partitionColumnName;
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
hiveColumns.add(columnName);
}
@ -358,97 +473,65 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
tableInfo.next();
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName)) {
hiveColumns.add(columnName);
partitionColumns.add(columnName);
}
}
// Collect all column names
while (tableInfo.next() && StringUtils.isNotEmpty(columnName = tableInfo.getString(1))) {
hiveColumns.add(columnName);
while (tableInfo.next() && StringUtils.isNotEmpty(partitionColumnName = tableInfo.getString(1))) {
partitionColumns.add(partitionColumnName);
}
// Collect all partition columns
boolean moreRows = true;
boolean headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if ("# Partition Information".equals(line)) {
headerFound = true;
} else if ("# Detailed Table Information".equals(line)) {
// Not partitioned, exit the loop with headerFound = false
break;
}
moreRows = tableInfo.next();
final int partitionColumnsSize = partitionColumns.size();
final int partitionClauseSize = (partitionClause == null) ? 0 : partitionClause.size();
if (partitionClauseSize != partitionColumnsSize) {
throw new IOException("Found " + partitionColumnsSize + " partition columns but " + partitionClauseSize + " partition values were supplied");
}
List<String> partitionColumns = new ArrayList<>();
List<String> partitionColumnsEqualsValueList = new ArrayList<>();
List<String> partitionColumnsLocationList = new ArrayList<>();
if (headerFound) {
// If the table is partitioned, construct the partition=value strings for each partition column
String partitionColumnName;
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName) && !columnName.startsWith("#")) {
hiveColumns.add(columnName);
for (int i = 0; i < partitionClauseSize; i++) {
String partitionName = partitionClause.get(i).split(" ")[0];
String partitionValue = flowFile.getAttribute(partitionName);
if (StringUtils.isEmpty(partitionValue)) {
throw new IOException("No value found for partition value attribute '" + partitionName + "'");
}
// If the column was a header, check for a blank line to follow and skip it, otherwise add the column name
if (columnName.startsWith("#")) {
tableInfo.next();
columnName = tableInfo.getString(1);
if (StringUtils.isNotEmpty(columnName)) {
partitionColumns.add(columnName);
}
}
while (tableInfo.next() && StringUtils.isNotEmpty(partitionColumnName = tableInfo.getString(1))) {
partitionColumns.add(partitionColumnName);
}
final int partitionColumnsSize = partitionColumns.size();
if (partitionValues == null) {
throw new IOException("Found " + partitionColumnsSize + " partition columns but no Static Partition Values were supplied");
}
final int partitionValuesSize = partitionValues.size();
if (partitionValuesSize < partitionColumnsSize) {
throw new IOException("Found " + partitionColumnsSize + " partition columns but only " + partitionValuesSize + " Static Partition Values were supplied");
}
for (int i = 0; i < partitionColumns.size(); i++) {
partitionColumnsEqualsValueList.add(partitionColumns.get(i) + "='" + partitionValues.get(i) + "'");
// Add unquoted version for the output path
partitionColumnsLocationList.add(partitionColumns.get(i) + "=" + partitionValues.get(i));
if (!partitionColumns.contains(partitionName)) {
throw new IOException("Cannot add partition '" + partitionName + "' to existing table");
}
partitionColumnsEqualsValueList.add("`" + partitionName + "`='" + partitionValue + "'");
// Add unquoted version for the output path
partitionColumnsLocationList.add(partitionName + "=" + partitionValue);
}
}
// Get table location
moreRows = true;
headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if (line.startsWith("Location:")) {
headerFound = true;
continue; // Don't do a next() here, need to get the second column value
}
moreRows = tableInfo.next();
// Get table location
moreRows = true;
headerFound = false;
while (moreRows && !headerFound) {
String line = tableInfo.getString(1);
if (line.startsWith("Location:")) {
headerFound = true;
continue; // Don't do a next() here, need to get the second column value
}
String tableLocation = tableInfo.getString(2);
moreRows = tableInfo.next();
}
String tableLocation = tableInfo.getString(2);
String alterTableSql;
// If the table wasn't newly created, alter it accordingly
if (!tableCreated) {
StringBuilder alterTableStatement = new StringBuilder();
// Handle new columns
for (RecordField recordField : schema.getFields()) {
String recordFieldName = recordField.getFieldName().toLowerCase();
if (!hiveColumns.contains(recordFieldName) && !partitionColumns.contains(recordFieldName)) {
// The field does not exist in the table (and is not a partition column), add it
columnsToAdd.add(recordFieldName + " " + getHiveTypeFromFieldType(recordField.getDataType(), true));
columnsToAdd.add("`" + recordFieldName + "` " + getHiveTypeFromFieldType(recordField.getDataType(), true));
getLogger().info("Adding column " + recordFieldName + " to table " + tableName);
}
}
String alterTableSql;
if (!columnsToAdd.isEmpty()) {
alterTableStatement.append("ALTER TABLE ")
alterTableStatement.append("ALTER TABLE `")
.append(tableName)
.append(" ADD COLUMNS (")
.append("` ADD COLUMNS (")
.append(String.join(", ", columnsToAdd))
.append(")");
@ -459,24 +542,24 @@ public class UpdateHive_1_1Table extends AbstractProcessor {
s.execute(alterTableSql);
}
}
}
outputPath = tableLocation;
outputPath = tableLocation;
// Handle new partitions
if (!partitionColumnsEqualsValueList.isEmpty()) {
alterTableSql = "ALTER TABLE " +
tableName +
" ADD IF NOT EXISTS PARTITION (" +
String.join(", ", partitionColumnsEqualsValueList) +
")";
if (StringUtils.isNotEmpty(alterTableSql)) {
// Perform the table update
getLogger().info("Executing Hive DDL: " + alterTableSql);
s.execute(alterTableSql);
}
// Add attribute for HDFS location of the partition values
outputPath = tableLocation + "/" + String.join("/", partitionColumnsLocationList);
// Handle new partition values
if (!partitionColumnsEqualsValueList.isEmpty()) {
alterTableSql = "ALTER TABLE `" +
tableName +
"` ADD IF NOT EXISTS PARTITION (" +
String.join(", ", partitionColumnsEqualsValueList) +
")";
if (StringUtils.isNotEmpty(alterTableSql)) {
// Perform the table update
getLogger().info("Executing Hive DDL: " + alterTableSql);
s.execute(alterTableSql);
}
// Add attribute for HDFS location of the partition values
outputPath = tableLocation + "/" + String.join("/", partitionColumnsLocationList);
}
session.putAttribute(flowFile, ATTR_OUTPUT_PATH, outputPath);

View File

@ -100,6 +100,15 @@ public class TestUpdateHive_1_1Table {
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/users", null}
};
private static final String[][] DESC_EXTERNAL_USERS_TABLE_RESULTSET = new String[][]{
new String[]{"name", "string", ""},
new String[]{"favorite_number", "int", ""},
new String[]{"favorite_color", "string", ""},
new String[]{"scale", "double", ""},
new String[]{"", null, null},
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/path/to/users", null}
};
private static final String[] DESC_NEW_TABLE_COLUMN_NAMES = DESC_USERS_TABLE_COLUMN_NAMES;
private static final String[][] DESC_NEW_TABLE_RESULTSET = new String[][]{
@ -110,7 +119,7 @@ public class TestUpdateHive_1_1Table {
new String[]{"scale", "double", ""},
new String[]{"", null, null},
new String[]{"# Detailed Table Information", null, null},
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/newTable", null}
new String[]{"Location:", "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/_newTable", null}
};
@Rule
@ -188,7 +197,7 @@ public class TestUpdateHive_1_1Table {
runner.assertNotValid();
final File tempDir = folder.getRoot();
final File dbDir = new File(tempDir, "db");
final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
final DBCPService service = new MockHiveConnectionPool(dbDir.getAbsolutePath());
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive_1_1Table.HIVE_DBCP_SERVICE, "dbcp");
@ -199,17 +208,19 @@ public class TestUpdateHive_1_1Table {
runner.run();
}
@Test
public void testNoStatementsExecuted() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive_1_1Table.TABLE_NAME, "users");
final MockDBCPService service = new MockDBCPService("test");
final MockHiveConnectionPool service = new MockHiveConnectionPool("test");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive_1_1Table.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHive_1_1Table.STATIC_PARTITION_VALUES, "Asia,China");
runner.enqueue(new byte[0]);
runner.setProperty(UpdateHive_1_1Table.PARTITION_CLAUSE, "continent, country");
HashMap<String,String> attrs = new HashMap<>();
attrs.put("continent", "Asia");
attrs.put("country", "China");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHive_1_1Table.REL_SUCCESS, 1);
@ -220,28 +231,87 @@ public class TestUpdateHive_1_1Table {
}
@Test
public void testCreateTable() throws Exception {
public void testCreateManagedTable() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive_1_1Table.TABLE_NAME, "${table.name}");
runner.setProperty(UpdateHive_1_1Table.CREATE_TABLE, UpdateHive_1_1Table.CREATE_IF_NOT_EXISTS);
runner.setProperty(UpdateHive_1_1Table.TABLE_STORAGE_FORMAT, UpdateHive_1_1Table.PARQUET);
final MockDBCPService service = new MockDBCPService("newTable");
final MockHiveConnectionPool service = new MockHiveConnectionPool("_newTable");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive_1_1Table.HIVE_DBCP_SERVICE, "dbcp");
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "newTable");
attrs.put("table.name", "_newTable");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHive_1_1Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive_1_1Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_TABLE, "newTable");
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/newTable");
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_TABLE, "_newTable");
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/_newTable");
List<String> statements = service.getExecutedStatements();
assertEquals(1, statements.size());
assertEquals("CREATE TABLE IF NOT EXISTS newTable (name STRING, favorite_number INT, favorite_color STRING, scale DOUBLE) STORED AS PARQUET",
assertEquals("CREATE TABLE IF NOT EXISTS `_newTable` (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) STORED AS PARQUET",
statements.get(0));
}
@Test
public void testCreateManagedTableWithPartition() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive_1_1Table.TABLE_NAME, "${table.name}");
runner.setProperty(UpdateHive_1_1Table.CREATE_TABLE, UpdateHive_1_1Table.CREATE_IF_NOT_EXISTS);
runner.setProperty(UpdateHive_1_1Table.PARTITION_CLAUSE, "age int");
runner.setProperty(UpdateHive_1_1Table.TABLE_STORAGE_FORMAT, UpdateHive_1_1Table.PARQUET);
final MockHiveConnectionPool service = new MockHiveConnectionPool("_newTable");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive_1_1Table.HIVE_DBCP_SERVICE, "dbcp");
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "_newTable");
attrs.put("age", "23");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHive_1_1Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive_1_1Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_TABLE, "_newTable");
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/_newTable");
List<String> statements = service.getExecutedStatements();
assertEquals(1, statements.size());
assertEquals("CREATE TABLE IF NOT EXISTS `_newTable` (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) PARTITIONED BY (`age` int) STORED AS PARQUET",
statements.get(0));
}
@Test
public void testCreateExternalTable() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive_1_1Table.TABLE_NAME, "${table.name}");
runner.setProperty(UpdateHive_1_1Table.CREATE_TABLE, UpdateHive_1_1Table.CREATE_IF_NOT_EXISTS);
runner.setProperty(UpdateHive_1_1Table.TABLE_MANAGEMENT_STRATEGY, UpdateHive_1_1Table.EXTERNAL_TABLE);
runner.setProperty(UpdateHive_1_1Table.TABLE_STORAGE_FORMAT, UpdateHive_1_1Table.PARQUET);
final MockHiveConnectionPool service = new MockHiveConnectionPool("ext_users");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive_1_1Table.HIVE_DBCP_SERVICE, "dbcp");
runner.assertNotValid(); // Needs location specified
runner.setProperty(UpdateHive_1_1Table.EXTERNAL_TABLE_LOCATION, "/path/to/users");
runner.assertValid();
Map<String, String> attrs = new HashMap<>();
attrs.put("db.name", "default");
attrs.put("table.name", "ext_users");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHive_1_1Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive_1_1Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_TABLE, "ext_users");
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/path/to/users");
List<String> statements = service.getExecutedStatements();
assertEquals(1, statements.size());
assertEquals("CREATE EXTERNAL TABLE IF NOT EXISTS `ext_users` (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) STORED AS PARQUET "
+ "LOCATION '/path/to/users'",
statements.get(0));
}
@ -249,25 +319,27 @@ public class TestUpdateHive_1_1Table {
public void testAddColumnsAndPartition() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive_1_1Table.TABLE_NAME, "messages");
final MockDBCPService service = new MockDBCPService("test");
final MockHiveConnectionPool service = new MockHiveConnectionPool("test");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive_1_1Table.HIVE_DBCP_SERVICE, "dbcp");
runner.setProperty(UpdateHive_1_1Table.STATIC_PARTITION_VALUES, "Asia,China");
runner.enqueue(new byte[0]);
runner.setProperty(UpdateHive_1_1Table.PARTITION_CLAUSE, "continent, country");
HashMap<String,String> attrs = new HashMap<>();
attrs.put("continent", "Asia");
attrs.put("country", "China");
runner.enqueue(new byte[0], attrs);
runner.run();
runner.assertTransferCount(UpdateHive_1_1Table.REL_SUCCESS, 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(UpdateHive_1_1Table.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_TABLE, "messages");
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_PATH,
"hdfs://mycluster:8020/warehouse/tablespace/managed/hive/messages/continent=Asia/country=China");
flowFile.assertAttributeEquals(UpdateHive_1_1Table.ATTR_OUTPUT_PATH, "hdfs://mycluster:8020/warehouse/tablespace/managed/hive/messages/continent=Asia/country=China");
List<String> statements = service.getExecutedStatements();
assertEquals(2, statements.size());
// All columns from users table/data should be added to the table, and a new partition should be added
assertEquals("ALTER TABLE messages ADD COLUMNS (name STRING, favorite_number INT, favorite_color STRING, scale DOUBLE)",
assertEquals("ALTER TABLE `messages` ADD COLUMNS (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE)",
statements.get(0));
assertEquals("ALTER TABLE messages ADD IF NOT EXISTS PARTITION (continent='Asia', country='China')",
assertEquals("ALTER TABLE `messages` ADD IF NOT EXISTS PARTITION (`continent`='Asia', `country`='China')",
statements.get(1));
}
@ -275,7 +347,7 @@ public class TestUpdateHive_1_1Table {
public void testMissingPartitionValues() throws Exception {
configure(processor, 1);
runner.setProperty(UpdateHive_1_1Table.TABLE_NAME, "messages");
final DBCPService service = new MockDBCPService("test");
final DBCPService service = new MockHiveConnectionPool("test");
runner.addControllerService("dbcp", service);
runner.enableControllerService(service);
runner.setProperty(UpdateHive_1_1Table.HIVE_DBCP_SERVICE, "dbcp");
@ -289,12 +361,12 @@ public class TestUpdateHive_1_1Table {
/**
* Simple implementation only for testing purposes
*/
private static class MockDBCPService extends AbstractControllerService implements Hive_1_1DBCPService {
private static class MockHiveConnectionPool extends AbstractControllerService implements Hive_1_1DBCPService {
private final String dbLocation;
private final List<String> executedStatements = new ArrayList<>();
MockDBCPService(final String dbLocation) {
MockHiveConnectionPool(final String dbLocation) {
this.dbLocation = dbLocation;
}
@ -313,11 +385,13 @@ public class TestUpdateHive_1_1Table {
final String query = (String) invocation.getArguments()[0];
if ("SHOW TABLES".equals(query)) {
return new MockResultSet(SHOW_TABLES_COLUMN_NAMES, SHOW_TABLES_RESULTSET).createResultSet();
} else if ("DESC FORMATTED messages".equals(query)) {
} else if ("DESC FORMATTED `messages`".equals(query)) {
return new MockResultSet(DESC_MESSAGES_TABLE_COLUMN_NAMES, DESC_MESSAGES_TABLE_RESULTSET).createResultSet();
} else if ("DESC FORMATTED users".equals(query)) {
} else if ("DESC FORMATTED `users`".equals(query)) {
return new MockResultSet(DESC_USERS_TABLE_COLUMN_NAMES, DESC_USERS_TABLE_RESULTSET).createResultSet();
} else if ("DESC FORMATTED newTable".equals(query)) {
} else if ("DESC FORMATTED `ext_users`".equals(query)) {
return new MockResultSet(DESC_USERS_TABLE_COLUMN_NAMES, DESC_EXTERNAL_USERS_TABLE_RESULTSET).createResultSet();
} else if ("DESC FORMATTED `_newTable`".equals(query)) {
return new MockResultSet(DESC_NEW_TABLE_COLUMN_NAMES, DESC_NEW_TABLE_RESULTSET).createResultSet();
} else {
return new MockResultSet(new String[]{}, new String[][]{new String[]{}}).createResultSet();