From 9ac88d210a181b36679db701766c35d662fa7ce2 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 1 Sep 2017 11:33:23 +0200 Subject: [PATCH] NIFI-4342 - Add EL support to PutHiveStreaming Signed-off-by: Matthew Burgess This closes #2120 --- .../nifi/processors/hive/PutHiveStreaming.java | 12 ++++++++---- .../processors/hive/TestPutHiveStreaming.java | 16 ++++++++++++---- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java index fe677e5532..afb99fda2f 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java @@ -143,6 +143,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the " + "Hive metastore is 9043.") .required(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.URI_VALIDATOR) .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with / .build(); @@ -162,6 +163,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { .displayName("Database Name") .description("The name of the database in which to put the data.") .required(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -170,6 +172,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { .displayName("Table Name") .description("The name of the database table in which to put the data.") .required(true) + .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -179,6 +182,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must " + "correspond exactly to the order of partition columns specified during the table creation.") .required(false) + .expressionLanguageSupported(true) .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries .build(); @@ -322,9 +326,9 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { public void setup(final ProcessContext context) { ComponentLog log = getLogger(); - final String metastoreUri = context.getProperty(METASTORE_URI).getValue(); - final String dbName = context.getProperty(DB_NAME).getValue(); - final String tableName = context.getProperty(TABLE_NAME).getValue(); + final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue(); + final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions().getValue(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean(); final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger(); final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger(); @@ -565,7 +569,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); final List partitionColumnList; - final String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue(); + final String partitionColumns = context.getProperty(PARTITION_COLUMNS).evaluateAttributeExpressions().getValue(); if (partitionColumns == null || partitionColumns.isEmpty()) { partitionColumnList = Collections.emptyList(); } else { diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java index 6198619aad..e57bb08711 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java @@ -462,12 +462,20 @@ public class TestPutHiveStreaming { @Test public void onTriggerWithPartitionColumns() throws Exception { - runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083"); - runner.setProperty(PutHiveStreaming.DB_NAME, "default"); - runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); + runner.setVariable("metastore", "thrift://localhost:9083"); + runner.setVariable("database", "default"); + runner.setVariable("table", "users"); + runner.setVariable("partitions", "favorite_number, favorite_color"); + + runner.setProperty(PutHiveStreaming.METASTORE_URI, "${metastore}"); + runner.setProperty(PutHiveStreaming.DB_NAME, "${database}"); + runner.setProperty(PutHiveStreaming.TABLE_NAME, "${table}"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); - runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "favorite_number, favorite_color"); + runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "${partitions}"); runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "true"); + + runner.assertValid(); + Map user1 = new HashMap() { { put("name", "Joe");