nifi-2381 Connection Pooling Service -Drop invalid connections and create new ones.

This closes #986.
This commit is contained in:
Toivo Adams 2016-09-04 13:40:24 +03:00 committed by Pierre Villard
parent ad3d63d204
commit 4fc0e9c407
3 changed files with 243 additions and 0 deletions

View File

@ -52,6 +52,30 @@
<groupId>org.apache.derby</groupId> <groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId> <artifactId>derby</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbynet</artifactId>
<version>10.11.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbytools</artifactId>
<version>10.11.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbyclient</artifactId>
<version>10.11.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.192</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.hamcrest</groupId> <groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId> <artifactId>hamcrest-all</artifactId>

View File

@ -114,6 +114,17 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
.sensitive(false) .sensitive(false)
.build(); .build();
public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
.name("Validation-query")
.displayName("Validation query")
.description("Validation query used to validate connections before returning them. "
+ "When connection is invalid, it get's dropped and new valid connection will be returned. "
+ "Note!! Using validation might have some performance penalty.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
private static final List<PropertyDescriptor> properties; private static final List<PropertyDescriptor> properties;
static { static {
@ -125,6 +136,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
props.add(DB_PASSWORD); props.add(DB_PASSWORD);
props.add(MAX_WAIT_TIME); props.add(MAX_WAIT_TIME);
props.add(MAX_TOTAL_CONNECTIONS); props.add(MAX_TOTAL_CONNECTIONS);
props.add(VALIDATION_QUERY);
properties = Collections.unmodifiableList(props); properties = Collections.unmodifiableList(props);
} }
@ -158,6 +170,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS); final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger(); final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger();
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
dataSource = new BasicDataSource(); dataSource = new BasicDataSource();
dataSource.setDriverClassName(drv); dataSource.setDriverClassName(drv);
@ -171,6 +184,11 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
dataSource.setMaxWait(maxWaitMillis); dataSource.setMaxWait(maxWaitMillis);
dataSource.setMaxActive(maxTotal); dataSource.setMaxActive(maxTotal);
if (validationQuery!=null && !validationQuery.isEmpty()) {
dataSource.setValidationQuery(validationQuery);
dataSource.setTestOnBorrow(true);
}
dataSource.setUrl(dburl); dataSource.setUrl(dburl);
dataSource.setUsername(user); dataSource.setUsername(user);
dataSource.setPassword(passw); dataSource.setPassword(passw);

View File

@ -16,10 +16,13 @@
*/ */
package org.apache.nifi.dbcp; package org.apache.nifi.dbcp;
import org.apache.derby.drda.NetworkServerControl;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.h2.jdbc.JdbcSQLException;
import org.h2.tools.Server;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore; import org.junit.Ignore;
@ -28,6 +31,8 @@ import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import java.io.File; import java.io.File;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.net.URLClassLoader; import java.net.URLClassLoader;
@ -42,6 +47,7 @@ import java.util.Map;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class DBCPServiceTest { public class DBCPServiceTest {
@ -166,6 +172,201 @@ public class DBCPServiceTest {
} }
} }
/**
* Test Drop invalid connections and create new ones.
* Default behavior, invalid connections in pool.
*/
@Test
public void testDropInvalidConnectionsH2_Default() throws Exception {
// start the H2 TCP Server
String[] args = new String[0];
Server server = Server.createTcpServer(args).start();
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final DBCPConnectionPool service = new DBCPConnectionPool();
runner.addControllerService("test-dropcreate", service);
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:h2:tcp://localhost/~/test");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.h2.Driver");
runner.enableControllerService(service);
runner.assertValid(service);
final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-dropcreate");
Assert.assertNotNull(dbcpService);
// get and verify connections
for (int i = 0; i < 10; i++) {
final Connection connection = dbcpService.getConnection();
System.out.println(connection);
Assert.assertNotNull(connection);
assertValidConnectionH2(connection, i);
connection.close();
}
// restart server, connections in pool should became invalid
server.stop();
server.shutdown();
server.start();
// Note!! We should get something like:
// org.h2.jdbc.JdbcSQLException: Connection is broken: "session closed" [90067-192]
exception.expect(JdbcSQLException.class);
for (int i = 0; i < 10; i++) {
final Connection connection = dbcpService.getConnection();
System.out.println(connection);
Assert.assertNotNull(connection);
assertValidConnectionH2(connection, i);
connection.close();
}
server.shutdown();
}
/**
* Test Drop invalid connections and create new ones.
* Better behavior, invalid connections are dropped and valid created.
*/
@Test
public void testDropInvalidConnectionsH2_Better() throws Exception {
// start the H2 TCP Server
String[] args = new String[0];
Server server = Server.createTcpServer(args).start();
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final DBCPConnectionPool service = new DBCPConnectionPool();
runner.addControllerService("test-dropcreate", service);
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:h2:tcp://localhost/~/test");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.h2.Driver");
runner.setProperty(service, DBCPConnectionPool.VALIDATION_QUERY, "SELECT 5");
runner.enableControllerService(service);
runner.assertValid(service);
final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-dropcreate");
Assert.assertNotNull(dbcpService);
// get and verify connections
for (int i = 0; i < 10; i++) {
final Connection connection = dbcpService.getConnection();
System.out.println(connection);
Assert.assertNotNull(connection);
assertValidConnectionH2(connection, i);
connection.close();
}
// restart server, connections in pool should became invalid
server.stop();
server.shutdown();
server.start();
// Note!! We should not get something like:
// org.h2.jdbc.JdbcSQLException: Connection is broken: "session closed" [90067-192]
// Pool should remove invalid connections and create new valid connections.
for (int i = 0; i < 10; i++) {
final Connection connection = dbcpService.getConnection();
System.out.println(connection);
Assert.assertNotNull(connection);
assertValidConnectionH2(connection, i);
connection.close();
}
server.shutdown();
}
private void assertValidConnectionH2(Connection connection, int num) throws SQLException {
try (Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SELECT " + num);
assertTrue(rs.next());
int value = rs.getInt(1);
assertEquals(num, value);
assertTrue(connection.isValid(20));
}
}
/**
* Note!! Derby keeps something open even after server shutdown.
* So it's difficult to get invalid connections.
*
* Test Drop invalid connections and create new ones.
*/
@Ignore
@Test
public void testDropInvalidConnectionsDerby() throws Exception {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
if (dbLocation.exists())
throw new RuntimeException("Still exists " + dbLocation.getAbsolutePath());
// Start Derby server.
System.setProperty("derby.drda.startNetworkServer", "true");
System.setProperty("derby.system.home", DB_LOCATION);
NetworkServerControl serverControl = new NetworkServerControl(InetAddress.getLocalHost(),1527);
serverControl.start(new PrintWriter(System.out, true));
// create sample database
Class.forName("org.apache.derby.jdbc.ClientDriver");
Connection conn = DriverManager.getConnection("jdbc:derby://127.0.0.1:1527/sample;create=true");
conn.close();
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final DBCPConnectionPool service = new DBCPConnectionPool();
runner.addControllerService("test-dropcreate", service);
// set Derby database props
runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + "//127.0.0.1:1527/sample");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.ClientDriver");
runner.enableControllerService(service);
runner.assertValid(service);
final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-dropcreate");
Assert.assertNotNull(dbcpService);
for (int i = 0; i < 10; i++) {
final Connection connection = dbcpService.getConnection();
System.out.println(connection);
Assert.assertNotNull(connection);
assertValidConnectionDerby(connection, i);
connection.close();
}
serverControl.shutdown();
dbLocation.delete();
if (dbLocation.exists())
throw new RuntimeException("Still exists " + dbLocation.getAbsolutePath());
try {
serverControl.ping();
} catch (Exception e) {
}
Thread.sleep(2000);
for (int i = 0; i < 10; i++) {
final Connection connection = dbcpService.getConnection();
System.out.println(connection);
Assert.assertNotNull(connection);
assertValidConnectionDerby(connection, i);
connection.close();
}
}
private void assertValidConnectionDerby(Connection connection, int num) throws SQLException {
try (Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SELECT " + num + " FROM SYSIBM.SYSDUMMY1");
assertTrue(rs.next());
int value = rs.getInt(1);
assertEquals(num, value);
assertTrue(connection.isValid(20));
}
}
/** /**
* Test get database connection using Derby. Get many times, release immediately and getConnection should not fail. * Test get database connection using Derby. Get many times, release immediately and getConnection should not fail.
*/ */