NIFI-4342 - Add EL support to PutHiveStreaming

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2120
This commit is contained in:
Pierre Villard 2017-09-01 11:33:23 +02:00 committed by Matthew Burgess
parent 458c987fe3
commit 9ac88d210a
2 changed files with 20 additions and 8 deletions

View File

@ -143,6 +143,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
.description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the " .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
+ "Hive metastore is 9043.") + "Hive metastore is 9043.")
.required(true) .required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.URI_VALIDATOR) .addValidator(StandardValidators.URI_VALIDATOR)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with / .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
.build(); .build();
@ -162,6 +163,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
.displayName("Database Name") .displayName("Database Name")
.description("The name of the database in which to put the data.") .description("The name of the database in which to put the data.")
.required(true) .required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
@ -170,6 +172,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
.displayName("Table Name") .displayName("Table Name")
.description("The name of the database table in which to put the data.") .description("The name of the database table in which to put the data.")
.required(true) .required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
@ -179,6 +182,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
.description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must " .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
+ "correspond exactly to the order of partition columns specified during the table creation.") + "correspond exactly to the order of partition columns specified during the table creation.")
.required(false) .required(false)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
.build(); .build();
@ -322,9 +326,9 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
public void setup(final ProcessContext context) { public void setup(final ProcessContext context) {
ComponentLog log = getLogger(); ComponentLog log = getLogger();
final String metastoreUri = context.getProperty(METASTORE_URI).getValue(); final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue();
final String dbName = context.getProperty(DB_NAME).getValue(); final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions().getValue();
final String tableName = context.getProperty(TABLE_NAME).getValue(); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean(); final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger(); final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger(); final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger();
@ -565,7 +569,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
final List<String> partitionColumnList; final List<String> partitionColumnList;
final String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue(); final String partitionColumns = context.getProperty(PARTITION_COLUMNS).evaluateAttributeExpressions().getValue();
if (partitionColumns == null || partitionColumns.isEmpty()) { if (partitionColumns == null || partitionColumns.isEmpty()) {
partitionColumnList = Collections.emptyList(); partitionColumnList = Collections.emptyList();
} else { } else {

View File

@ -462,12 +462,20 @@ public class TestPutHiveStreaming {
@Test @Test
public void onTriggerWithPartitionColumns() throws Exception { public void onTriggerWithPartitionColumns() throws Exception {
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083"); runner.setVariable("metastore", "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setVariable("database", "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setVariable("table", "users");
runner.setVariable("partitions", "favorite_number, favorite_color");
runner.setProperty(PutHiveStreaming.METASTORE_URI, "${metastore}");
runner.setProperty(PutHiveStreaming.DB_NAME, "${database}");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "${table}");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "favorite_number, favorite_color"); runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "${partitions}");
runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "true"); runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "true");
runner.assertValid();
Map<String, Object> user1 = new HashMap<String, Object>() { Map<String, Object> user1 = new HashMap<String, Object>() {
{ {
put("name", "Joe"); put("name", "Joe");