mirror of https://github.com/apache/nifi.git
DBCPConnectionPool service fixes
Signed-off-by: Toivo Adams <toivo.adams@gmail.com> Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
parent
589e2b7ebf
commit
d1cace6a8a
|
@ -43,6 +43,11 @@
|
|||
<artifactId>derby</artifactId>
|
||||
<version>10.11.1.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-all</artifactId>
|
||||
<version>1.3</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.List;
|
|||
import org.apache.commons.dbcp.BasicDataSource;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
|
@ -41,29 +42,28 @@ import org.apache.nifi.reporting.InitializationException;
|
|||
@Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store"})
|
||||
@CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage."
|
||||
)
|
||||
public class DBCPServiceApacheDBCP14 extends AbstractControllerService implements DBCPService {
|
||||
public class DBCPConnectionPool extends AbstractControllerService implements DBCPService {
|
||||
|
||||
public static final DatabaseSystemDescriptor DEFAULT_DATABASE_SYSTEM = DatabaseSystems.getDescriptor("JavaDB");
|
||||
|
||||
public static final PropertyDescriptor DATABASE_SYSTEM = new PropertyDescriptor.Builder()
|
||||
.name("Database")
|
||||
.description("Database management system")
|
||||
// .allowableValues(POSTGRES, JavaDB, DERBY, MariaDB, OtherDB)
|
||||
.allowableValues(DatabaseSystems.knownDatabaseSystems)
|
||||
.defaultValue(DEFAULT_DATABASE_SYSTEM.getValue())
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_HOST = new PropertyDescriptor.Builder()
|
||||
.name("Database host")
|
||||
.description("Database host")
|
||||
.name("Database Host")
|
||||
.description("Database Host")
|
||||
.defaultValue(null)
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_PORT = new PropertyDescriptor.Builder()
|
||||
.name("Database port")
|
||||
.name("Database Port")
|
||||
.description("Database server port")
|
||||
.defaultValue(DEFAULT_DATABASE_SYSTEM.defaultPort.toString())
|
||||
.required(true)
|
||||
|
@ -71,7 +71,7 @@ public class DBCPServiceApacheDBCP14 extends AbstractControllerService implement
|
|||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder()
|
||||
.name("Database driver class name")
|
||||
.name("Database Driver Class Name")
|
||||
.description("Database driver class name")
|
||||
.defaultValue(DEFAULT_DATABASE_SYSTEM.driverClassName)
|
||||
.required(true)
|
||||
|
@ -79,7 +79,7 @@ public class DBCPServiceApacheDBCP14 extends AbstractControllerService implement
|
|||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
|
||||
.name("Database name")
|
||||
.name("Database Name")
|
||||
.description("Database name")
|
||||
.defaultValue(null)
|
||||
.required(true)
|
||||
|
@ -87,7 +87,7 @@ public class DBCPServiceApacheDBCP14 extends AbstractControllerService implement
|
|||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
|
||||
.name("Database user")
|
||||
.name("Database User")
|
||||
.description("Database user name")
|
||||
.defaultValue(null)
|
||||
.required(true)
|
||||
|
@ -103,6 +103,26 @@ public class DBCPServiceApacheDBCP14 extends AbstractControllerService implement
|
|||
.sensitive(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_WAIT_MILLIS = new PropertyDescriptor.Builder()
|
||||
.name("Max Wait Millis")
|
||||
.description("The maximum number of milliseconds that the pool will wait (when there are no available connections) "
|
||||
+ " for a connection to be returned before throwing an exception, or -1 to wait indefinitely. ")
|
||||
.defaultValue("500")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.LONG_VALIDATOR)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
|
||||
.name("Max Total Connections")
|
||||
.description("The maximum number of active connections that can be allocated from this pool at the same time, "
|
||||
+ " or negative for no limit.")
|
||||
.defaultValue("8")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
|
||||
private static final List<PropertyDescriptor> properties;
|
||||
|
||||
static {
|
||||
|
@ -126,35 +146,11 @@ public class DBCPServiceApacheDBCP14 extends AbstractControllerService implement
|
|||
return properties;
|
||||
}
|
||||
|
||||
//================================= Apache DBCP pool parameters ================================
|
||||
|
||||
/** The maximum number of milliseconds that the pool will wait (when there are no available connections)
|
||||
* for a connection to be returned before throwing an exception, or -1 to wait indefinitely.
|
||||
*/
|
||||
static final long maxWaitMillis = 500;
|
||||
|
||||
/** The maximum number of active connections that can be allocated from this pool at the same time,
|
||||
* or negative for no limit.
|
||||
*/
|
||||
static final int maxTotal = 8;
|
||||
|
||||
//=================================================================================================
|
||||
|
||||
/**
|
||||
* Idea was to dynamically set port, driver and url properties default values after user select database system.
|
||||
* As of 01mar2015 such functionality is not supported.
|
||||
*
|
||||
@Override
|
||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||
super.onPropertyModified(descriptor, oldValue, newValue);
|
||||
|
||||
if (descriptor.equals(DATABASE_SYSTEM)) {
|
||||
|
||||
DatabaseSystemDescriptor databaseSystemDescriptor = DatabaseSystems.getDescriptor(newValue);
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
* Create new pool, open some connections ready to be used
|
||||
* @param context
|
||||
* @throws InitializationException
|
||||
*/
|
||||
@OnEnabled
|
||||
public void onConfigured(final ConfigurationContext context) throws InitializationException {
|
||||
configContext = context;
|
||||
|
@ -167,6 +163,8 @@ public class DBCPServiceApacheDBCP14 extends AbstractControllerService implement
|
|||
String dbname = context.getProperty(DB_NAME).getValue();
|
||||
String user = context.getProperty(DB_USER).getValue();
|
||||
String passw = context.getProperty(DB_PASSWORD).getValue();
|
||||
Long maxWaitMillis = context.getProperty(MAX_WAIT_MILLIS).asLong();
|
||||
Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger();
|
||||
|
||||
String dburl = dbsystem.buildUrl(host, port, dbname);
|
||||
|
||||
|
@ -179,6 +177,9 @@ public class DBCPServiceApacheDBCP14 extends AbstractControllerService implement
|
|||
dataSource.setUsername(user);
|
||||
dataSource.setPassword(passw);
|
||||
|
||||
// That will ensure that you are using the ClassLoader for you NAR.
|
||||
dataSource.setDriverClassLoader(Thread.currentThread().getContextClassLoader());
|
||||
|
||||
// verify connection can be established.
|
||||
try {
|
||||
Connection con = dataSource.getConnection();
|
||||
|
@ -190,6 +191,19 @@ public class DBCPServiceApacheDBCP14 extends AbstractControllerService implement
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown pool, close all open connections.
|
||||
*/
|
||||
@OnDisabled
|
||||
public void shutdown() {
|
||||
try {
|
||||
dataSource.close();
|
||||
} catch (SQLException e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Connection getConnection() throws ProcessException {
|
||||
try {
|
|
@ -0,0 +1,15 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.dbcp.DBCPConnectionPool
|
|
@ -16,7 +16,8 @@
|
|||
*/
|
||||
package org.apache.nifi.dbcp;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.io.File;
|
||||
import java.sql.Connection;
|
||||
|
@ -26,8 +27,6 @@ import java.sql.Statement;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.activation.DataSource;
|
||||
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
|
@ -46,11 +45,11 @@ public class DBCPServiceTest {
|
|||
*
|
||||
*/
|
||||
@Test
|
||||
public void testBad1() throws InitializationException {
|
||||
public void testUnknownDatabaseSystem() throws InitializationException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
final DBCPServiceApacheDBCP14 service = new DBCPServiceApacheDBCP14();
|
||||
final DBCPConnectionPool service = new DBCPConnectionPool();
|
||||
final Map<String, String> properties = new HashMap<String, String>();
|
||||
properties.put(DBCPServiceApacheDBCP14.DATABASE_SYSTEM.getName(), "garbage");
|
||||
properties.put(DBCPConnectionPool.DATABASE_SYSTEM.getName(), "garbage");
|
||||
runner.addControllerService("test-bad2", service, properties);
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
@ -59,9 +58,9 @@ public class DBCPServiceTest {
|
|||
* Missing property values.
|
||||
*/
|
||||
@Test
|
||||
public void testGood1() throws InitializationException {
|
||||
public void testMissingPropertyValues() throws InitializationException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
final DBCPServiceApacheDBCP14 service = new DBCPServiceApacheDBCP14();
|
||||
final DBCPConnectionPool service = new DBCPConnectionPool();
|
||||
final Map<String, String> properties = new HashMap<String, String>();
|
||||
runner.addControllerService("test-bad1", service, properties);
|
||||
runner.assertNotValid(service);
|
||||
|
@ -73,24 +72,24 @@ public class DBCPServiceTest {
|
|||
*
|
||||
*/
|
||||
@Test
|
||||
public void testGood2() throws InitializationException, SQLException {
|
||||
public void testCreateInsertSelect() throws InitializationException, SQLException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
final DBCPServiceApacheDBCP14 service = new DBCPServiceApacheDBCP14();
|
||||
final DBCPConnectionPool service = new DBCPConnectionPool();
|
||||
runner.addControllerService("test-good1", service);
|
||||
|
||||
// remove previous test database, if any
|
||||
File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
// Should setProperty call also generate DBCPServiceApacheDBCP14.onPropertyModified() method call?
|
||||
// Should setProperty call also generate DBCPConnectionPool.onPropertyModified() method call?
|
||||
// It does not currently.
|
||||
|
||||
// Some properties already should have JavaDB/Derby default values, let's set only missing values.
|
||||
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_HOST, "NA"); // Embedded Derby don't use host
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_NAME, DB_LOCATION);
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_USER, "tester");
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_PASSWORD, "testerp");
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_HOST, "NA"); // Embedded Derby don't use host
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_NAME, DB_LOCATION);
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
|
||||
|
||||
runner.enableControllerService(service);
|
||||
|
||||
|
@ -117,17 +116,17 @@ public class DBCPServiceTest {
|
|||
@Test
|
||||
public void testExhaustPool() throws InitializationException, SQLException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
final DBCPServiceApacheDBCP14 service = new DBCPServiceApacheDBCP14();
|
||||
final DBCPConnectionPool service = new DBCPConnectionPool();
|
||||
runner.addControllerService("test-exhaust", service);
|
||||
|
||||
// remove previous test database, if any
|
||||
File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_HOST, "NA"); // Embedded Derby don't use host
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_NAME, DB_LOCATION);
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_USER, "tester");
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_PASSWORD, "testerp");
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_HOST, "NA"); // Embedded Derby don't use host
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_NAME, DB_LOCATION);
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
|
||||
|
||||
runner.enableControllerService(service);
|
||||
|
||||
|
@ -136,6 +135,7 @@ public class DBCPServiceTest {
|
|||
Assert.assertNotNull(dbcpService);
|
||||
|
||||
exception.expect(ProcessException.class);
|
||||
exception.expectMessage("Cannot get a connection, pool error Timeout waiting for idle object");
|
||||
for (int i = 0; i < 100; i++) {
|
||||
Connection connection = dbcpService.getConnection();
|
||||
Assert.assertNotNull(connection);
|
||||
|
@ -148,19 +148,19 @@ public class DBCPServiceTest {
|
|||
* and getConnection should not fail.
|
||||
*/
|
||||
@Test
|
||||
public void testGetMany() throws InitializationException, SQLException {
|
||||
public void testGetManyNormal() throws InitializationException, SQLException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
final DBCPServiceApacheDBCP14 service = new DBCPServiceApacheDBCP14();
|
||||
final DBCPConnectionPool service = new DBCPConnectionPool();
|
||||
runner.addControllerService("test-exhaust", service);
|
||||
|
||||
// remove previous test database, if any
|
||||
File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_HOST, "NA"); // Embedded Derby don't use host
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_NAME, DB_LOCATION);
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_USER, "tester");
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_PASSWORD, "testerp");
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_HOST, "NA"); // Embedded Derby don't use host
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_NAME, DB_LOCATION);
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
|
||||
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
|
||||
|
||||
runner.enableControllerService(service);
|
||||
|
||||
|
@ -177,7 +177,7 @@ public class DBCPServiceTest {
|
|||
|
||||
|
||||
@Test
|
||||
public void testDriverLaod() throws ClassNotFoundException {
|
||||
public void testDriverLoad() throws ClassNotFoundException {
|
||||
Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
|
||||
assertNotNull(clazz);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue