diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java index 64f3027749..c3724c3c0f 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java @@ -75,6 +75,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv .defaultValue(null) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .required(true) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() @@ -83,7 +84,10 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv .description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop " + "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication " + "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.") - .required(false).addValidator(HiveUtils.createMultipleFilesExistValidator()).build(); + .required(false) + .addValidator(HiveUtils.createMultipleFilesExistValidator()) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder() .name("hive-db-user") @@ -91,6 +95,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv .description("Database user name") .defaultValue(null) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder() @@ -101,6 +106,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv .required(false) .sensitive(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder() @@ -111,7 +117,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv .defaultValue("500 millis") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .sensitive(false) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder() @@ -122,7 +128,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv .defaultValue("8") .required(true) .addValidator(StandardValidators.INTEGER_VALIDATOR) - .sensitive(false) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder() @@ -183,7 +189,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv final List problems = new ArrayList<>(); if (confFileProvided) { - final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue(); + final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); final String keyTab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); problems.addAll(hiveConfigurator.validate(configFiles, principal, keyTab, validationResourceHolder, getLogger())); @@ -211,7 +217,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv ComponentLog log = getLogger(); - final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue(); + final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles); final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue(); @@ -219,7 +225,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv for (final Map.Entry entry : context.getProperties().entrySet()) { final PropertyDescriptor descriptor = entry.getKey(); if (descriptor.isDynamic()) { - hiveConfig.set(descriptor.getName(), entry.getValue()); + hiveConfig.set(descriptor.getName(), context.getProperty(descriptor).evaluateAttributeExpressions().getValue()); } } @@ -237,15 +243,15 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab}); } - final String user = context.getProperty(DB_USER).getValue(); - final String passw = context.getProperty(DB_PASSWORD).getValue(); - final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS); - final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger(); + final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); + final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); + final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); + final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger(); dataSource = new BasicDataSource(); dataSource.setDriverClassName(drv); - final String dburl = context.getProperty(DATABASE_URL).getValue(); + final String dburl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue(); dataSource.setMaxWait(maxWaitMillis); dataSource.setMaxActive(maxTotal); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java index 3e375f9121..2dc67f702f 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java @@ -57,6 +57,9 @@ public class HiveUtils { */ public static Validator createMultipleFilesExistValidator() { return (subject, input, context) -> { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { + return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); + } final String[] files = input.split("\\s*,\\s*"); for (String filename : files) { try { diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java index 0b5cd8f64a..79bcb7afd7 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java @@ -19,9 +19,13 @@ package org.apache.nifi.dbcp.hive; import org.apache.commons.dbcp.BasicDataSource; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.registry.VariableDescriptor; +import org.apache.nifi.util.MockConfigurationContext; +import org.apache.nifi.util.MockVariableRegistry; import org.junit.Before; import org.junit.Test; @@ -30,6 +34,8 @@ import java.lang.reflect.Field; import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedExceptionAction; import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.isA; @@ -51,7 +57,7 @@ public class HiveConnectionPoolTest { when(userGroupInformation.doAs(isA(PrivilegedExceptionAction.class))).thenAnswer(invocation -> { try { return ((PrivilegedExceptionAction) invocation.getArguments()[0]).run(); - } catch (IOException |Error|RuntimeException|InterruptedException e) { + } catch (IOException | Error | RuntimeException | InterruptedException e) { throw e; } catch (Throwable e) { throw new UndeclaredThrowableException(e); @@ -87,4 +93,45 @@ public class HiveConnectionPoolTest { throw e; } } + + @Test + public void testExpressionLanguageSupport() throws Exception { + final String URL = "jdbc:hive2://localhost:10000/default"; + final String USER = "user"; + final String PASS = "pass"; + final int MAX_CONN = 7; + final String MAX_WAIT = "10 sec"; // 10000 milliseconds + final String CONF = "/path/to/hive-site.xml"; + hiveConnectionPool = new HiveConnectionPool(); + + Map props = new HashMap() {{ + put(HiveConnectionPool.DATABASE_URL, "${url}"); + put(HiveConnectionPool.DB_USER, "${username}"); + put(HiveConnectionPool.DB_PASSWORD, "${password}"); + put(HiveConnectionPool.MAX_TOTAL_CONNECTIONS, "${maxconn}"); + put(HiveConnectionPool.MAX_WAIT_TIME, "${maxwait}"); + put(HiveConnectionPool.HIVE_CONFIGURATION_RESOURCES, "${hiveconf}"); + }}; + + MockVariableRegistry registry = new MockVariableRegistry(); + registry.setVariable(new VariableDescriptor("url"), URL); + registry.setVariable(new VariableDescriptor("username"), USER); + registry.setVariable(new VariableDescriptor("password"), PASS); + registry.setVariable(new VariableDescriptor("maxconn"), Integer.toString(MAX_CONN)); + registry.setVariable(new VariableDescriptor("maxwait"), MAX_WAIT); + registry.setVariable(new VariableDescriptor("hiveconf"), CONF); + + + MockConfigurationContext context = new MockConfigurationContext(props, null, registry); + hiveConnectionPool.onConfigured(context); + + Field dataSourceField = HiveConnectionPool.class.getDeclaredField("dataSource"); + dataSourceField.setAccessible(true); + basicDataSource = (BasicDataSource) dataSourceField.get(hiveConnectionPool); + assertEquals(URL, basicDataSource.getUrl()); + assertEquals(USER, basicDataSource.getUsername()); + assertEquals(PASS, basicDataSource.getPassword()); + assertEquals(MAX_CONN, basicDataSource.getMaxActive()); + assertEquals(10000L, basicDataSource.getMaxWait()); + } }