From e9848f42767338d794f985c72a8be04fe5d1e698 Mon Sep 17 00:00:00 2001 From: Tim Reardon Date: Fri, 12 May 2017 13:25:24 -0400 Subject: [PATCH] NIFI-3881 Fix PutHiveStreaming EL evaluation Signed-off-by: Matt Burgess This closes #1791 --- .../processors/hive/PutHiveStreaming.java | 5 ++- .../processors/hive/TestPutHiveStreaming.java | 31 ------------------- 2 files changed, 2 insertions(+), 34 deletions(-) diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java index e7d85cde5d..f08310e7cc 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java @@ -179,7 +179,6 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must " + "correspond exactly to the order of partition columns specified during the table creation.") .required(false) - .expressionLanguageSupported(false) .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries .build(); @@ -329,7 +328,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean(); final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger(); final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger(); - final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger(); + final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).evaluateAttributeExpressions().asInteger(); final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue(); hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles); @@ -559,7 +558,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor { } final ComponentLog log = getLogger(); - final Integer recordsPerTxn = context.getProperty(RECORDS_PER_TXN).asInteger(); + final Integer recordsPerTxn = context.getProperty(RECORDS_PER_TXN).evaluateAttributeExpressions(flowFile).asInteger(); // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore) ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader(); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java index f642607433..6198619aad 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java @@ -107,7 +107,6 @@ public class TestPutHiveStreaming { @Test public void testSetup() throws Exception { - runner.setValidateExpressionUsage(false); runner.assertNotValid(); runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083"); runner.setProperty(PutHiveStreaming.DB_NAME, "default"); @@ -119,7 +118,6 @@ public class TestPutHiveStreaming { @Test public void testUgiGetsCleared() { - runner.setValidateExpressionUsage(false); runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083"); runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); @@ -133,7 +131,6 @@ public class TestPutHiveStreaming { when(hiveConf.get(SecurityUtil.HADOOP_SECURITY_AUTHENTICATION)).thenReturn(SecurityUtil.KERBEROS); ugi = mock(UserGroupInformation.class); when(hiveConfigurator.authenticate(eq(hiveConf), anyString(), anyString(), anyLong(), any())).thenReturn(ugi); - runner.setValidateExpressionUsage(false); runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083"); runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); @@ -149,7 +146,6 @@ public class TestPutHiveStreaming { @Test public void testSetupBadPartitionColumns() throws Exception { - runner.setValidateExpressionUsage(false); runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083"); runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); @@ -161,7 +157,6 @@ public class TestPutHiveStreaming { @Test(expected = AssertionError.class) public void testSetupWithKerberosAuthFailed() throws Exception { - runner.setValidateExpressionUsage(false); runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083"); runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); @@ -188,7 +183,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -208,7 +202,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); - runner.setValidateExpressionUsage(false); runner.enqueue("I am not an Avro record".getBytes()); runner.run(); @@ -222,7 +215,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true"); - runner.setValidateExpressionUsage(false); runner.enqueue("I am not an Avro record".getBytes()); try { runner.run(); @@ -243,7 +235,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "100"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -277,7 +268,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -311,7 +301,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2"); - runner.setValidateExpressionUsage(false); processor.setGenerateWriteFailure(true, 1); Map user1 = new HashMap() { { @@ -359,7 +348,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2"); runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true"); - runner.setValidateExpressionUsage(false); processor.setGenerateWriteFailure(true, 1); Map user1 = new HashMap() { { @@ -401,7 +389,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2"); runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true"); - runner.setValidateExpressionUsage(false); // The first two records are committed, then an issue will happen at the 3rd record. processor.setGenerateWriteFailure(true, 2); Map user1 = new HashMap() { @@ -481,7 +468,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "favorite_number, favorite_color"); runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "true"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -507,7 +493,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "favorite_food"); runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "false"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -533,7 +518,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "favorite_food"); runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "false"); runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -563,7 +547,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "2"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -587,7 +570,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); runner.setProperty(PutHiveStreaming.HEARTBEAT_INTERVAL, "1"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -613,7 +595,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -636,7 +617,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -665,7 +645,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -686,7 +665,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -711,7 +689,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -739,7 +716,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -772,7 +748,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -794,7 +769,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -822,7 +796,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -845,7 +818,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -874,7 +846,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -897,7 +868,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe"); @@ -926,7 +896,6 @@ public class TestPutHiveStreaming { runner.setProperty(PutHiveStreaming.DB_NAME, "default"); runner.setProperty(PutHiveStreaming.TABLE_NAME, "users"); runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100"); - runner.setValidateExpressionUsage(false); Map user1 = new HashMap() { { put("name", "Joe");