NIFI-6684 Add more property to Hive3ConnectionPool (#3763)

* NiFi-6684 Add more property to Hive3ConnectionPool

signed-off by: Peter Wicks <patricker@gmail.com>
This commit is contained in:
James Cheng 2019-10-03 21:24:29 +08:00 committed by Peter Wicks
parent b12a9ad446
commit f1d35f46ce
6 changed files with 197 additions and 44 deletions

View File

@ -135,6 +135,11 @@
<version>1.10.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>

View File

@ -18,7 +18,8 @@ package org.apache.nifi.dbcp.hive;
import java.io.File;
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.jdbc.HiveDriver;
@ -28,10 +29,12 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.dbcp.DBCPValidator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
@ -56,6 +59,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
@ -68,6 +72,31 @@ import org.apache.nifi.controller.ControllerServiceInitializationContext;
@CapabilityDescription("Provides Database Connection Pooling Service for Apache Hive 3.x. Connections can be asked from pool and returned after usage.")
public class Hive3ConnectionPool extends AbstractControllerService implements Hive3DBCPService {
private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
/**
* Copied from {@link GenericObjectPoolConfig.DEFAULT_MIN_IDLE} in Commons-DBCP 2.5.0
*/
private static final String DEFAULT_MIN_IDLE = "0";
/**
* Copied from {@link GenericObjectPoolConfig.DEFAULT_MAX_IDLE} in Commons-DBCP 2.5.0
*/
private static final String DEFAULT_MAX_IDLE = "8";
/**
* Copied from private variable {@link BasicDataSource.maxConnLifetimeMillis} in Commons-DBCP 2.5.0
*/
private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
/**
* Copied from {@link GenericObjectPoolConfig.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS} in Commons-DBCP 2.5.0
*/
private static final String DEFAULT_EVICTION_RUN_PERIOD = String.valueOf(-1L);
/**
* Copied from {@link GenericObjectPoolConfig.DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.5.0
* and converted from 1800000L to "1800000 millis" to "30 mins"
*/
private static final String DEFAULT_MIN_EVICTABLE_IDLE_TIME = "30 mins";
/**
* Copied from {@link GenericObjectPoolConfig.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.5.0
*/
private static final String DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME = String.valueOf(-1L);
static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("hive-db-connect-url")
@ -145,6 +174,77 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder()
.displayName("Minimum Idle Connections")
.name("dbcp-min-idle-conns")
.description("The minimum number of connections that can remain idle in the pool, without extra ones being " +
"created, or zero to create none.")
.defaultValue(DEFAULT_MIN_IDLE)
.required(false)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder()
.displayName("Max Idle Connections")
.name("dbcp-max-idle-conns")
.description("The maximum number of connections that can remain idle in the pool, without extra ones being " +
"released, or negative for no limit.")
.defaultValue(DEFAULT_MAX_IDLE)
.required(false)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
.displayName("Max Connection Lifetime")
.name("dbcp-max-conn-lifetime")
.description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " +
"connection will fail the next activation, passivation or validation test. A value of zero or less " +
"means the connection has an infinite lifetime.")
.defaultValue(DEFAULT_MAX_CONN_LIFETIME)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor EVICTION_RUN_PERIOD = new PropertyDescriptor.Builder()
.displayName("Time Between Eviction Runs")
.name("dbcp-time-between-eviction-runs")
.description("The number of milliseconds to sleep between runs of the idle connection evictor thread. When " +
"non-positive, no idle connection evictor thread will be run.")
.defaultValue(DEFAULT_EVICTION_RUN_PERIOD)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
.displayName("Minimum Evictable Idle Time")
.name("dbcp-min-evictable-idle-time")
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for eviction.")
.defaultValue(DEFAULT_MIN_EVICTABLE_IDLE_TIME)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor SOFT_MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
.displayName("Soft Minimum Evictable Idle Time")
.name("dbcp-soft-min-evictable-idle-time")
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for " +
"eviction by the idle connection evictor, with the extra condition that at least a minimum number of" +
" idle connections remain in the pool. When the not-soft version of this option is set to a positive" +
" value, it is examined first by the idle connection evictor: when idle connections are visited by " +
"the evictor, idle time is first compared against it (without considering the number of idle " +
"connections in the pool) and then against this soft option, including the minimum idle connections " +
"constraint.")
.defaultValue(DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
private static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
@ -178,6 +278,12 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi
props.add(MAX_WAIT_TIME);
props.add(MAX_TOTAL_CONNECTIONS);
props.add(VALIDATION_QUERY);
props.add(MIN_IDLE);
props.add(MAX_IDLE);
props.add(MAX_CONN_LIFETIME);
props.add(EVICTION_RUN_PERIOD);
props.add(MIN_EVICTABLE_IDLE_TIME);
props.add(SOFT_MIN_EVICTABLE_IDLE_TIME);
props.add(KERBEROS_CREDENTIALS_SERVICE);
kerberosConfigFile = context.getKerberosConfigurationFile();
@ -321,15 +427,18 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
final Integer minIdle = context.getProperty(MIN_IDLE).asInteger();
final Integer maxIdle = context.getProperty(MAX_IDLE).asInteger();
final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME));
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD));
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME));
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME));
dataSource = new BasicDataSource();
dataSource.setDriverClassName(drv);
connectionUrl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
dataSource.setMaxWait(maxWaitMillis);
dataSource.setMaxActive(maxTotal);
if (validationQuery != null && !validationQuery.isEmpty()) {
dataSource.setValidationQuery(validationQuery);
dataSource.setTestOnBorrow(true);
@ -338,6 +447,18 @@ public class Hive3ConnectionPool extends AbstractControllerService implements Hi
dataSource.setUrl(connectionUrl);
dataSource.setUsername(user);
dataSource.setPassword(passw);
dataSource.setMaxWaitMillis(maxWaitMillis);
dataSource.setMaxTotal(maxTotal);
dataSource.setMinIdle(minIdle);
dataSource.setMaxIdle(maxIdle);
dataSource.setMaxConnLifetimeMillis(maxConnLifetimeMillis);
dataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
dataSource.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);
}
private Long extractMillisWithInfinite(PropertyValue prop) {
return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS);
}
/**

View File

@ -31,7 +31,7 @@ import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
@ -162,8 +162,8 @@ public class Hive3ConnectionPoolTest {
assertEquals(URL, basicDataSource.getUrl());
assertEquals(USER, basicDataSource.getUsername());
assertEquals(PASS, basicDataSource.getPassword());
assertEquals(MAX_CONN, basicDataSource.getMaxActive());
assertEquals(10000L, basicDataSource.getMaxWait());
assertEquals(MAX_CONN, basicDataSource.getMaxTotal());
assertEquals(10000L, basicDataSource.getMaxWaitMillis());
assertEquals(URL, hive3ConnectionPool.getConnectionURL());
}

View File

@ -27,5 +27,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.util.FormatUtils;
import java.util.regex.Pattern;
public class DBCPValidator {
public static final Validator CUSTOM_TIME_PERIOD_VALIDATOR = new Validator() {
private final Pattern TIME_DURATION_PATTERN = Pattern.compile(FormatUtils.TIME_DURATION_REGEX);
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
if (input == null) {
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Time Period cannot be null").build();
}
if (TIME_DURATION_PATTERN.matcher(input.toLowerCase()).matches() || input.equals("-1")) {
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
} else {
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(false)
.explanation("Must be of format <duration> <TimeUnit> where <duration> is a "
+ "non-negative integer and TimeUnit is a supported Time Unit, such "
+ "as: nanos, millis, secs, mins, hrs, days")
.build();
}
}
};
}

View File

@ -25,9 +25,6 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.AttributeExpression;
@ -38,7 +35,6 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import javax.security.auth.login.LoginException;
@ -51,7 +47,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
/**
* Implementation of for Database Connection Pooling Service. Apache DBCP is used for connection pooling functionality.
@ -91,33 +86,6 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
*/
private static final String DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME = String.valueOf(-1L);
private static final Validator CUSTOM_TIME_PERIOD_VALIDATOR = new Validator() {
private final Pattern TIME_DURATION_PATTERN = Pattern.compile(FormatUtils.TIME_DURATION_REGEX);
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
if (input == null) {
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Time Period cannot be null").build();
}
if (TIME_DURATION_PATTERN.matcher(input.toLowerCase()).matches() || input.equals("-1")) {
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
} else {
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(false)
.explanation("Must be of format <duration> <TimeUnit> where <duration> is a "
+ "non-negative integer and TimeUnit is a supported Time Unit, such "
+ "as: nanos, millis, secs, mins, hrs, days")
.build();
}
}
};
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("Database Connection URL")
.description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters."
@ -171,7 +139,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
+ " for a connection to be returned before failing, or -1 to wait indefinitely. ")
.defaultValue("500 millis")
.required(true)
.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.sensitive(false)
.build();
@ -226,7 +194,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
"means the connection has an infinite lifetime.")
.defaultValue(DEFAULT_MAX_CONN_LIFETIME)
.required(false)
.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
@ -237,7 +205,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
"non-positive, no idle connection evictor thread will be run.")
.defaultValue(DEFAULT_EVICTION_RUN_PERIOD)
.required(false)
.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
@ -247,7 +215,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for eviction.")
.defaultValue(DEFAULT_MIN_EVICTABLE_IDLE_TIME)
.required(false)
.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
@ -263,7 +231,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
"constraint.")
.defaultValue(DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME)
.required(false)
.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();