NIFI-5588 - Fix max wait time in DBCP Connection Pool

This closes #3022

Signed-off-by: zenfenan <zenfenan@apache.org>
This commit is contained in:
Pierre Villard 2018-09-23 22:00:36 +02:00 committed by zenfenan
parent 2a964681ec
commit c4d3b5e94f
2 changed files with 59 additions and 3 deletions

View File

@ -23,6 +23,9 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.AttributeExpression;
@ -30,6 +33,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import java.net.MalformedURLException;
@ -41,6 +45,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
/**
* Implementation of for Database Connection Pooling Service. Apache DBCP is used for connection pooling functionality.
@ -54,6 +59,33 @@ import java.util.concurrent.TimeUnit;
+ "Note that no flow file input (attributes, e.g.) is available for use in Expression Language constructs for these properties.")
public class DBCPConnectionPool extends AbstractControllerService implements DBCPService {
private static final Validator CUSTOM_TIME_PERIOD_VALIDATOR = new Validator() {
private final Pattern TIME_DURATION_PATTERN = Pattern.compile(FormatUtils.TIME_DURATION_REGEX);
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
if (input == null) {
return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Time Period cannot be null").build();
}
if (TIME_DURATION_PATTERN.matcher(input.toLowerCase()).matches() || input.equals("-1")) {
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
} else {
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(false)
.explanation("Must be of format <duration> <TimeUnit> where <duration> is a "
+ "non-negative integer and TimeUnit is a supported Time Unit, such "
+ "as: nanos, millis, secs, mins, hrs, days")
.build();
}
}
};
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("Database Connection URL")
.description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters."
@ -107,7 +139,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
+ " for a connection to be returned before failing, or -1 to wait indefinitely. ")
.defaultValue("500 millis")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR)
.sensitive(false)
.build();
@ -187,9 +219,9 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
final String drv = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger();
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = "-1".equals(context.getProperty(MAX_WAIT_TIME).getValue()) ? -1 : context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
dataSource = new BasicDataSource();
dataSource.setDriverClassName(drv);
@ -282,4 +314,4 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
return "DBCPConnectionPool[id=" + getIdentifier() + "]";
}
}
}

View File

@ -69,6 +69,30 @@ public class DBCPServiceTest {
runner.assertNotValid(service);
}
/**
* Max wait set to -1
*/
@Test
public void testMaxWait() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final DBCPConnectionPool service = new DBCPConnectionPool();
runner.addControllerService("test-good1", service);
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// set embedded Derby database connection url
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
runner.setProperty(service, DBCPConnectionPool.MAX_WAIT_TIME, "-1");
runner.enableControllerService(service);
runner.assertValid(service);
}
/**
* Test database connection using Derby. Connect, create table, insert, select, drop table.
*