NIFI-3881 Fix PutHiveStreaming EL evaluation

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1791
This commit is contained in:
Tim Reardon 2017-05-12 13:25:24 -04:00 committed by Matt Burgess
parent afd2b04afd
commit e9848f4276
2 changed files with 2 additions and 34 deletions

View File

@ -179,7 +179,6 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
.description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
+ "correspond exactly to the order of partition columns specified during the table creation.")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
.build();
@ -329,7 +328,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger();
final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger();
final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).evaluateAttributeExpressions().asInteger();
final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
@ -559,7 +558,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
}
final ComponentLog log = getLogger();
final Integer recordsPerTxn = context.getProperty(RECORDS_PER_TXN).asInteger();
final Integer recordsPerTxn = context.getProperty(RECORDS_PER_TXN).evaluateAttributeExpressions(flowFile).asInteger();
// Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();

View File

@ -107,7 +107,6 @@ public class TestPutHiveStreaming {
@Test
public void testSetup() throws Exception {
runner.setValidateExpressionUsage(false);
runner.assertNotValid();
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
@ -119,7 +118,6 @@ public class TestPutHiveStreaming {
@Test
public void testUgiGetsCleared() {
runner.setValidateExpressionUsage(false);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
@ -133,7 +131,6 @@ public class TestPutHiveStreaming {
when(hiveConf.get(SecurityUtil.HADOOP_SECURITY_AUTHENTICATION)).thenReturn(SecurityUtil.KERBEROS);
ugi = mock(UserGroupInformation.class);
when(hiveConfigurator.authenticate(eq(hiveConf), anyString(), anyString(), anyLong(), any())).thenReturn(ugi);
runner.setValidateExpressionUsage(false);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
@ -149,7 +146,6 @@ public class TestPutHiveStreaming {
@Test
public void testSetupBadPartitionColumns() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
@ -161,7 +157,6 @@ public class TestPutHiveStreaming {
@Test(expected = AssertionError.class)
public void testSetupWithKerberosAuthFailed() throws Exception {
runner.setValidateExpressionUsage(false);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
@ -188,7 +183,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -208,7 +202,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
runner.enqueue("I am not an Avro record".getBytes());
runner.run();
@ -222,7 +215,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
runner.setValidateExpressionUsage(false);
runner.enqueue("I am not an Avro record".getBytes());
try {
runner.run();
@ -243,7 +235,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -277,7 +268,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -311,7 +301,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
runner.setValidateExpressionUsage(false);
processor.setGenerateWriteFailure(true, 1);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
@ -359,7 +348,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
runner.setValidateExpressionUsage(false);
processor.setGenerateWriteFailure(true, 1);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
@ -401,7 +389,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.RECORDS_PER_TXN, "2");
runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
runner.setValidateExpressionUsage(false);
// The first two records are committed, then an issue will happen at the 3rd record.
processor.setGenerateWriteFailure(true, 2);
Map<String, Object> user1 = new HashMap<String, Object>() {
@ -481,7 +468,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "favorite_number, favorite_color");
runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "true");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -507,7 +493,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "favorite_food");
runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "false");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -533,7 +518,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "favorite_food");
runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "false");
runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -563,7 +547,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "2");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -587,7 +570,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setProperty(PutHiveStreaming.HEARTBEAT_INTERVAL, "1");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -613,7 +595,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -636,7 +617,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -665,7 +645,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -686,7 +665,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -711,7 +689,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -739,7 +716,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -772,7 +748,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -794,7 +769,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -822,7 +796,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -845,7 +818,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -874,7 +846,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -897,7 +868,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setProperty(PutHiveStreaming.ROLLBACK_ON_FAILURE, "true");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
@ -926,7 +896,6 @@ public class TestPutHiveStreaming {
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
runner.setValidateExpressionUsage(false);
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");