mirror of https://github.com/apache/nifi.git
NIFI-3867: Add Expression Language support to HiveConnectionPool properties
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #1783.
This commit is contained in:
parent
bc68eb754f
commit
3353865ce9
|
@ -75,6 +75,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
||||||
.defaultValue(null)
|
.defaultValue(null)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.required(true)
|
.required(true)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
|
||||||
|
@ -83,7 +84,10 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
||||||
.description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
|
.description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop "
|
||||||
+ "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
|
+ "will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication "
|
||||||
+ "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
|
+ "with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.")
|
||||||
.required(false).addValidator(HiveUtils.createMultipleFilesExistValidator()).build();
|
.required(false)
|
||||||
|
.addValidator(HiveUtils.createMultipleFilesExistValidator())
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
|
||||||
.name("hive-db-user")
|
.name("hive-db-user")
|
||||||
|
@ -91,6 +95,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
||||||
.description("Database user name")
|
.description("Database user name")
|
||||||
.defaultValue(null)
|
.defaultValue(null)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
|
||||||
|
@ -101,6 +106,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
||||||
.required(false)
|
.required(false)
|
||||||
.sensitive(true)
|
.sensitive(true)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
|
||||||
|
@ -111,7 +117,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
||||||
.defaultValue("500 millis")
|
.defaultValue("500 millis")
|
||||||
.required(true)
|
.required(true)
|
||||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||||
.sensitive(false)
|
.expressionLanguageSupported(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
|
||||||
|
@ -122,7 +128,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
||||||
.defaultValue("8")
|
.defaultValue("8")
|
||||||
.required(true)
|
.required(true)
|
||||||
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
||||||
.sensitive(false)
|
.expressionLanguageSupported(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
|
||||||
|
@ -183,7 +189,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
||||||
final List<ValidationResult> problems = new ArrayList<>();
|
final List<ValidationResult> problems = new ArrayList<>();
|
||||||
|
|
||||||
if (confFileProvided) {
|
if (confFileProvided) {
|
||||||
final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
|
final String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
|
||||||
final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
|
final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
|
||||||
final String keyTab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
|
final String keyTab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
|
||||||
problems.addAll(hiveConfigurator.validate(configFiles, principal, keyTab, validationResourceHolder, getLogger()));
|
problems.addAll(hiveConfigurator.validate(configFiles, principal, keyTab, validationResourceHolder, getLogger()));
|
||||||
|
@ -211,7 +217,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
||||||
|
|
||||||
ComponentLog log = getLogger();
|
ComponentLog log = getLogger();
|
||||||
|
|
||||||
final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
|
final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
|
||||||
final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
|
final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
|
||||||
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
|
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
|
||||||
|
|
||||||
|
@ -219,7 +225,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
||||||
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
|
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
|
||||||
final PropertyDescriptor descriptor = entry.getKey();
|
final PropertyDescriptor descriptor = entry.getKey();
|
||||||
if (descriptor.isDynamic()) {
|
if (descriptor.isDynamic()) {
|
||||||
hiveConfig.set(descriptor.getName(), entry.getValue());
|
hiveConfig.set(descriptor.getName(), context.getProperty(descriptor).evaluateAttributeExpressions().getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,15 +243,15 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
||||||
getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab});
|
getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab});
|
||||||
|
|
||||||
}
|
}
|
||||||
final String user = context.getProperty(DB_USER).getValue();
|
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
|
||||||
final String passw = context.getProperty(DB_PASSWORD).getValue();
|
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
|
||||||
final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
|
final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
|
||||||
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger();
|
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
|
||||||
|
|
||||||
dataSource = new BasicDataSource();
|
dataSource = new BasicDataSource();
|
||||||
dataSource.setDriverClassName(drv);
|
dataSource.setDriverClassName(drv);
|
||||||
|
|
||||||
final String dburl = context.getProperty(DATABASE_URL).getValue();
|
final String dburl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
|
||||||
|
|
||||||
dataSource.setMaxWait(maxWaitMillis);
|
dataSource.setMaxWait(maxWaitMillis);
|
||||||
dataSource.setMaxActive(maxTotal);
|
dataSource.setMaxActive(maxTotal);
|
||||||
|
|
|
@ -57,6 +57,9 @@ public class HiveUtils {
|
||||||
*/
|
*/
|
||||||
public static Validator createMultipleFilesExistValidator() {
|
public static Validator createMultipleFilesExistValidator() {
|
||||||
return (subject, input, context) -> {
|
return (subject, input, context) -> {
|
||||||
|
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
|
||||||
|
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
|
||||||
|
}
|
||||||
final String[] files = input.split("\\s*,\\s*");
|
final String[] files = input.split("\\s*,\\s*");
|
||||||
for (String filename : files) {
|
for (String filename : files) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -19,9 +19,13 @@ package org.apache.nifi.dbcp.hive;
|
||||||
|
|
||||||
import org.apache.commons.dbcp.BasicDataSource;
|
import org.apache.commons.dbcp.BasicDataSource;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.registry.VariableDescriptor;
|
||||||
|
import org.apache.nifi.util.MockConfigurationContext;
|
||||||
|
import org.apache.nifi.util.MockVariableRegistry;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -30,6 +34,8 @@ import java.lang.reflect.Field;
|
||||||
import java.lang.reflect.UndeclaredThrowableException;
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.mockito.Matchers.isA;
|
import static org.mockito.Matchers.isA;
|
||||||
|
@ -87,4 +93,45 @@ public class HiveConnectionPoolTest {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExpressionLanguageSupport() throws Exception {
|
||||||
|
final String URL = "jdbc:hive2://localhost:10000/default";
|
||||||
|
final String USER = "user";
|
||||||
|
final String PASS = "pass";
|
||||||
|
final int MAX_CONN = 7;
|
||||||
|
final String MAX_WAIT = "10 sec"; // 10000 milliseconds
|
||||||
|
final String CONF = "/path/to/hive-site.xml";
|
||||||
|
hiveConnectionPool = new HiveConnectionPool();
|
||||||
|
|
||||||
|
Map<PropertyDescriptor, String> props = new HashMap<PropertyDescriptor, String>() {{
|
||||||
|
put(HiveConnectionPool.DATABASE_URL, "${url}");
|
||||||
|
put(HiveConnectionPool.DB_USER, "${username}");
|
||||||
|
put(HiveConnectionPool.DB_PASSWORD, "${password}");
|
||||||
|
put(HiveConnectionPool.MAX_TOTAL_CONNECTIONS, "${maxconn}");
|
||||||
|
put(HiveConnectionPool.MAX_WAIT_TIME, "${maxwait}");
|
||||||
|
put(HiveConnectionPool.HIVE_CONFIGURATION_RESOURCES, "${hiveconf}");
|
||||||
|
}};
|
||||||
|
|
||||||
|
MockVariableRegistry registry = new MockVariableRegistry();
|
||||||
|
registry.setVariable(new VariableDescriptor("url"), URL);
|
||||||
|
registry.setVariable(new VariableDescriptor("username"), USER);
|
||||||
|
registry.setVariable(new VariableDescriptor("password"), PASS);
|
||||||
|
registry.setVariable(new VariableDescriptor("maxconn"), Integer.toString(MAX_CONN));
|
||||||
|
registry.setVariable(new VariableDescriptor("maxwait"), MAX_WAIT);
|
||||||
|
registry.setVariable(new VariableDescriptor("hiveconf"), CONF);
|
||||||
|
|
||||||
|
|
||||||
|
MockConfigurationContext context = new MockConfigurationContext(props, null, registry);
|
||||||
|
hiveConnectionPool.onConfigured(context);
|
||||||
|
|
||||||
|
Field dataSourceField = HiveConnectionPool.class.getDeclaredField("dataSource");
|
||||||
|
dataSourceField.setAccessible(true);
|
||||||
|
basicDataSource = (BasicDataSource) dataSourceField.get(hiveConnectionPool);
|
||||||
|
assertEquals(URL, basicDataSource.getUrl());
|
||||||
|
assertEquals(USER, basicDataSource.getUsername());
|
||||||
|
assertEquals(PASS, basicDataSource.getPassword());
|
||||||
|
assertEquals(MAX_CONN, basicDataSource.getMaxActive());
|
||||||
|
assertEquals(10000L, basicDataSource.getMaxWait());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue