NIFI-5790: Exposes 6 commons-dbcp options in DBCPConnectionPool

Signed-off-by: Peter Wicks <patricker@gmail.com>

This Closes #3133
This commit is contained in:
Colin Dean 2018-11-05 16:18:21 -05:00 committed by Peter Wicks
parent 63f55d05b4
commit a628aced6b
No known key found for this signature in database
GPG Key ID: 79ABE9BA9C7AB3CD
2 changed files with 275 additions and 1 deletions

View File

@ -17,12 +17,14 @@
package org.apache.nifi.dbcp;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
@ -59,6 +61,32 @@ import java.util.regex.Pattern;
+ "Note that no flow file input (attributes, e.g.) is available for use in Expression Language constructs for these properties.")
public class DBCPConnectionPool extends AbstractControllerService implements DBCPService {
/**
* Copied from {@link GenericObjectPoolConfig.DEFAULT_MIN_IDLE} in Commons-DBCP 2.5.0
*/
private static final String DEFAULT_MIN_IDLE = "0";
/**
* Copied from {@link GenericObjectPoolConfig.DEFAULT_MAX_IDLE} in Commons-DBCP 2.5.0
*/
private static final String DEFAULT_MAX_IDLE = "8";
/**
* Copied from private variable {@link BasicDataSource.maxConnLifetimeMillis} in Commons-DBCP 2.5.0
*/
private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
/**
* Copied from {@link GenericObjectPoolConfig.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS} in Commons-DBCP 2.5.0
*/
private static final String DEFAULT_EVICTION_RUN_PERIOD = String.valueOf(-1L);
/**
* Copied from {@link GenericObjectPoolConfig.DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.5.0
* and converted from 1800000L to "1800000 millis" to "30 mins"
*/
private static final String DEFAULT_MIN_EVICTABLE_IDLE_TIME = "30 mins";
/**
* Copied from {@link GenericObjectPoolConfig.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS} in Commons-DBCP 2.5.0
*/
private static final String DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME = String.valueOf(-1L);
private static final Validator CUSTOM_TIME_PERIOD_VALIDATOR = new Validator() {
private final Pattern TIME_DURATION_PATTERN = Pattern.compile(FormatUtils.TIME_DURATION_REGEX);
@ -164,6 +192,77 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder()
.displayName("Minimum Idle Connections")
.name("dbcp-mim-idle-conns")
.description("The minimum number of connections that can remain idle in the pool, without extra ones being " +
"created, or zero to create none.")
.defaultValue(DEFAULT_MIN_IDLE)
.required(false)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder()
.displayName("Max Idle Connections")
.name("dbcp-max-idle-conns")
.description("The maximum number of connections that can remain idle in the pool, without extra ones being " +
"released, or negative for no limit.")
.defaultValue(DEFAULT_MAX_IDLE)
.required(false)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
.displayName("Max Connection Lifetime")
.name("dbcp-max-conn-lifetime")
.description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " +
"connection will fail the next activation, passivation or validation test. A value of zero or less " +
"means the connection has an infinite lifetime.")
.defaultValue(DEFAULT_MAX_CONN_LIFETIME)
.required(false)
.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor EVICTION_RUN_PERIOD = new PropertyDescriptor.Builder()
.displayName("Time Between Eviction Runs")
.name("dbcp-time-between-eviction-runs")
.description("The number of milliseconds to sleep between runs of the idle connection evictor thread. When " +
"non-positive, no idle connection evictor thread will be run.")
.defaultValue(DEFAULT_EVICTION_RUN_PERIOD)
.required(false)
.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
.displayName("Minimum Evictable Idle Time")
.name("dbcp-min-evictable-idle-time")
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for eviction.")
.defaultValue(DEFAULT_MIN_EVICTABLE_IDLE_TIME)
.required(false)
.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor SOFT_MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
.displayName("Soft Minimum Evictable Idle Time")
.name("dbcp-soft-min-evictable-idle-time")
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for " +
"eviction by the idle connection evictor, with the extra condition that at least a minimum number of" +
" idle connections remain in the pool. When the not-soft version of this option is set to a positive" +
" value, it is examined first by the idle connection evictor: when idle connections are visited by " +
"the evictor, idle time is first compared against it (without considering the number of idle " +
"connections in the pool) and then against this soft option, including the minimum idle connections " +
"constraint.")
.defaultValue(DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME)
.required(false)
.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
private static final List<PropertyDescriptor> properties;
static {
@ -176,6 +275,12 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
props.add(MAX_WAIT_TIME);
props.add(MAX_TOTAL_CONNECTIONS);
props.add(VALIDATION_QUERY);
props.add(MIN_IDLE);
props.add(MAX_IDLE);
props.add(MAX_CONN_LIFETIME);
props.add(EVICTION_RUN_PERIOD);
props.add(MIN_EVICTABLE_IDLE_TIME);
props.add(SOFT_MIN_EVICTABLE_IDLE_TIME);
properties = Collections.unmodifiableList(props);
}
@ -221,7 +326,13 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger();
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = "-1".equals(context.getProperty(MAX_WAIT_TIME).getValue()) ? -1 : context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME));
final Integer minIdle = context.getProperty(MIN_IDLE).asInteger();
final Integer maxIdle = context.getProperty(MAX_IDLE).asInteger();
final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME));
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD));
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME));
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME));
dataSource = new BasicDataSource();
dataSource.setDriverClassName(drv);
@ -234,6 +345,12 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
dataSource.setMaxWaitMillis(maxWaitMillis);
dataSource.setMaxTotal(maxTotal);
dataSource.setMinIdle(minIdle);
dataSource.setMaxIdle(maxIdle);
dataSource.setMaxConnLifetimeMillis(maxConnLifetimeMillis);
dataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
dataSource.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);
if (validationQuery!=null && !validationQuery.isEmpty()) {
dataSource.setValidationQuery(validationQuery);
@ -250,6 +367,10 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
}
private Long extractMillisWithInfinite(PropertyValue prop) {
return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS);
}
/**
* using Thread.currentThread().getContextClassLoader(); will ensure that you are using the ClassLoader for you NAR.
*
@ -314,4 +435,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
return "DBCPConnectionPool[id=" + getIdentifier() + "]";
}
BasicDataSource getDataSource() {
return dataSource;
}
}

View File

@ -41,6 +41,7 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@ -93,6 +94,155 @@ public class DBCPServiceTest {
runner.assertValid(service);
}
/**
* Checks validity of idle limit and time settings including a default
*/
@Test
public void testIdleConnectionsSettings() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final DBCPConnectionPool service = new DBCPConnectionPool();
runner.addControllerService("test-good1", service);
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// set embedded Derby database connection url
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
runner.setProperty(service, DBCPConnectionPool.MAX_WAIT_TIME, "-1");
runner.setProperty(service, DBCPConnectionPool.MAX_IDLE, "2");
runner.setProperty(service, DBCPConnectionPool.MAX_CONN_LIFETIME, "1 secs");
runner.setProperty(service, DBCPConnectionPool.EVICTION_RUN_PERIOD, "1 secs");
runner.setProperty(service, DBCPConnectionPool.MIN_EVICTABLE_IDLE_TIME, "1 secs");
runner.setProperty(service, DBCPConnectionPool.SOFT_MIN_EVICTABLE_IDLE_TIME, "1 secs");
runner.enableControllerService(service);
runner.assertValid(service);
}
@Test
public void testMinIdleCannotBeNegative() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final DBCPConnectionPool service = new DBCPConnectionPool();
runner.addControllerService("test-good1", service);
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// set embedded Derby database connection url
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
runner.setProperty(service, DBCPConnectionPool.MAX_WAIT_TIME, "-1");
runner.setProperty(service, DBCPConnectionPool.MIN_IDLE, "-1");
runner.assertNotValid(service);
}
/**
* Checks to ensure that settings have been passed down into the DBCP
*/
@Test
public void testIdleSettingsAreSet() throws InitializationException, SQLException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final DBCPConnectionPool service = new DBCPConnectionPool();
runner.addControllerService("test-good1", service);
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// set embedded Derby database connection url
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
runner.setProperty(service, DBCPConnectionPool.MAX_WAIT_TIME, "-1");
runner.setProperty(service, DBCPConnectionPool.MAX_IDLE, "6");
runner.setProperty(service, DBCPConnectionPool.MIN_IDLE, "4");
runner.setProperty(service, DBCPConnectionPool.MAX_CONN_LIFETIME, "1 secs");
runner.setProperty(service, DBCPConnectionPool.EVICTION_RUN_PERIOD, "1 secs");
runner.setProperty(service, DBCPConnectionPool.MIN_EVICTABLE_IDLE_TIME, "1 secs");
runner.setProperty(service, DBCPConnectionPool.SOFT_MIN_EVICTABLE_IDLE_TIME, "1 secs");
runner.enableControllerService(service);
Assert.assertEquals(6, service.getDataSource().getMaxIdle());
Assert.assertEquals(4, service.getDataSource().getMinIdle());
Assert.assertEquals(1000, service.getDataSource().getMaxConnLifetimeMillis());
Assert.assertEquals(1000, service.getDataSource().getTimeBetweenEvictionRunsMillis());
Assert.assertEquals(1000, service.getDataSource().getMinEvictableIdleTimeMillis());
Assert.assertEquals(1000, service.getDataSource().getSoftMinEvictableIdleTimeMillis());
service.getDataSource().close();
}
/**
* Creates a few connections and step closes them to see what happens
*/
@Test
public void testIdle() throws InitializationException, SQLException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final DBCPConnectionPool service = new DBCPConnectionPool();
runner.addControllerService("test-good1", service);
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// set embedded Derby database connection url
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
runner.setProperty(service, DBCPConnectionPool.MAX_WAIT_TIME, "-1");
runner.setProperty(service, DBCPConnectionPool.MAX_IDLE, "4");
runner.setProperty(service, DBCPConnectionPool.MIN_IDLE, "1");
runner.setProperty(service, DBCPConnectionPool.MAX_CONN_LIFETIME, "1000 millis");
runner.setProperty(service, DBCPConnectionPool.EVICTION_RUN_PERIOD, "100 millis");
runner.setProperty(service, DBCPConnectionPool.MIN_EVICTABLE_IDLE_TIME, "100 millis");
runner.setProperty(service, DBCPConnectionPool.SOFT_MIN_EVICTABLE_IDLE_TIME, "100 millis");
runner.enableControllerService(service);
ArrayList<Connection> connections = new ArrayList<>();
for (int i = 0; i < 6; i++) {
connections.add(service.getConnection());
}
Assert.assertEquals(6, service.getDataSource().getNumActive());
connections.get(0).close();
Assert.assertEquals(5, service.getDataSource().getNumActive());
Assert.assertEquals(1, service.getDataSource().getNumIdle());
connections.get(1).close();
connections.get(2).close();
connections.get(3).close();
//now at max idle
Assert.assertEquals(2, service.getDataSource().getNumActive());
Assert.assertEquals(4, service.getDataSource().getNumIdle());
//now a connection should get closed for real so that numIdle does not exceed maxIdle
connections.get(4).close();
Assert.assertEquals(4, service.getDataSource().getNumIdle());
Assert.assertEquals(1, service.getDataSource().getNumActive());
connections.get(5).close();
Assert.assertEquals(4, service.getDataSource().getNumIdle());
Assert.assertEquals(0, service.getDataSource().getNumActive());
Thread.sleep(500);
Assert.assertEquals(1, service.getDataSource().getNumIdle());
service.getDataSource().close();
}
/**
* Test database connection using Derby. Connect, create table, insert, select, drop table.
*