From 4fc0e9c40763d05eee01b48f76cd4cf273b61330 Mon Sep 17 00:00:00 2001 From: Toivo Adams Date: Sun, 4 Sep 2016 13:40:24 +0300 Subject: [PATCH] nifi-2381 Connection Pooling Service -Drop invalid connections and create new ones. This closes #986. --- .../nifi-dbcp-service/pom.xml | 24 +++ .../apache/nifi/dbcp/DBCPConnectionPool.java | 18 ++ .../org/apache/nifi/dbcp/DBCPServiceTest.java | 201 ++++++++++++++++++ 3 files changed, 243 insertions(+) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml index 85c460f3d5..6891cb92da 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml @@ -52,6 +52,30 @@ org.apache.derby derby + + org.apache.derby + derbynet + 10.11.1.1 + test + + + org.apache.derby + derbytools + 10.11.1.1 + test + + + org.apache.derby + derbyclient + 10.11.1.1 + test + + + com.h2database + h2 + 1.4.192 + test + org.hamcrest hamcrest-all diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java index 522a720f53..9bb5a477ea 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java @@ -114,6 +114,17 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC .sensitive(false) .build(); + public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder() + .name("Validation-query") + .displayName("Validation query") + .description("Validation query used to validate connections before returning them. " + + "When connection is invalid, it get's dropped and new valid connection will be returned. " + + "Note!! Using validation might have some performance penalty.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + private static final List properties; static { @@ -125,6 +136,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC props.add(DB_PASSWORD); props.add(MAX_WAIT_TIME); props.add(MAX_TOTAL_CONNECTIONS); + props.add(VALIDATION_QUERY); properties = Collections.unmodifiableList(props); } @@ -158,6 +170,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS); final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger(); + final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue(); dataSource = new BasicDataSource(); dataSource.setDriverClassName(drv); @@ -171,6 +184,11 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC dataSource.setMaxWait(maxWaitMillis); dataSource.setMaxActive(maxTotal); + if (validationQuery!=null && !validationQuery.isEmpty()) { + dataSource.setValidationQuery(validationQuery); + dataSource.setTestOnBorrow(true); + } + dataSource.setUrl(dburl); dataSource.setUsername(user); dataSource.setPassword(passw); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java index 1e2b8d57bb..6234cbebc5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java @@ -16,10 +16,13 @@ */ package org.apache.nifi.dbcp; +import org.apache.derby.drda.NetworkServerControl; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.h2.jdbc.JdbcSQLException; +import org.h2.tools.Server; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; @@ -28,6 +31,8 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import java.io.File; +import java.io.PrintWriter; +import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; @@ -42,6 +47,7 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class DBCPServiceTest { @@ -166,6 +172,201 @@ public class DBCPServiceTest { } } + /** + * Test Drop invalid connections and create new ones. + * Default behavior, invalid connections in pool. + */ + @Test + public void testDropInvalidConnectionsH2_Default() throws Exception { + + // start the H2 TCP Server + String[] args = new String[0]; + Server server = Server.createTcpServer(args).start(); + + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + final DBCPConnectionPool service = new DBCPConnectionPool(); + runner.addControllerService("test-dropcreate", service); + + runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:h2:tcp://localhost/~/test"); + runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.h2.Driver"); + runner.enableControllerService(service); + + runner.assertValid(service); + final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-dropcreate"); + Assert.assertNotNull(dbcpService); + + // get and verify connections + for (int i = 0; i < 10; i++) { + final Connection connection = dbcpService.getConnection(); + System.out.println(connection); + Assert.assertNotNull(connection); + assertValidConnectionH2(connection, i); + connection.close(); + } + + // restart server, connections in pool should became invalid + server.stop(); + server.shutdown(); + server.start(); + + // Note!! We should get something like: + // org.h2.jdbc.JdbcSQLException: Connection is broken: "session closed" [90067-192] + exception.expect(JdbcSQLException.class); + for (int i = 0; i < 10; i++) { + final Connection connection = dbcpService.getConnection(); + System.out.println(connection); + Assert.assertNotNull(connection); + assertValidConnectionH2(connection, i); + connection.close(); + } + + server.shutdown(); + } + + /** + * Test Drop invalid connections and create new ones. + * Better behavior, invalid connections are dropped and valid created. + */ + @Test + public void testDropInvalidConnectionsH2_Better() throws Exception { + + // start the H2 TCP Server + String[] args = new String[0]; + Server server = Server.createTcpServer(args).start(); + + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + final DBCPConnectionPool service = new DBCPConnectionPool(); + runner.addControllerService("test-dropcreate", service); + + runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:h2:tcp://localhost/~/test"); + runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.h2.Driver"); + runner.setProperty(service, DBCPConnectionPool.VALIDATION_QUERY, "SELECT 5"); + runner.enableControllerService(service); + + runner.assertValid(service); + final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-dropcreate"); + Assert.assertNotNull(dbcpService); + + // get and verify connections + for (int i = 0; i < 10; i++) { + final Connection connection = dbcpService.getConnection(); + System.out.println(connection); + Assert.assertNotNull(connection); + assertValidConnectionH2(connection, i); + connection.close(); + } + + // restart server, connections in pool should became invalid + server.stop(); + server.shutdown(); + server.start(); + + // Note!! We should not get something like: + // org.h2.jdbc.JdbcSQLException: Connection is broken: "session closed" [90067-192] + // Pool should remove invalid connections and create new valid connections. + for (int i = 0; i < 10; i++) { + final Connection connection = dbcpService.getConnection(); + System.out.println(connection); + Assert.assertNotNull(connection); + assertValidConnectionH2(connection, i); + connection.close(); + } + + server.shutdown(); + } + + private void assertValidConnectionH2(Connection connection, int num) throws SQLException { + try (Statement statement = connection.createStatement()) { + ResultSet rs = statement.executeQuery("SELECT " + num); + assertTrue(rs.next()); + int value = rs.getInt(1); + assertEquals(num, value); + assertTrue(connection.isValid(20)); + } + } + + /** + * Note!! Derby keeps something open even after server shutdown. + * So it's difficult to get invalid connections. + * + * Test Drop invalid connections and create new ones. + */ + @Ignore + @Test + public void testDropInvalidConnectionsDerby() throws Exception { + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + if (dbLocation.exists()) + throw new RuntimeException("Still exists " + dbLocation.getAbsolutePath()); + + // Start Derby server. + System.setProperty("derby.drda.startNetworkServer", "true"); + System.setProperty("derby.system.home", DB_LOCATION); + NetworkServerControl serverControl = new NetworkServerControl(InetAddress.getLocalHost(),1527); + serverControl.start(new PrintWriter(System.out, true)); + + // create sample database + Class.forName("org.apache.derby.jdbc.ClientDriver"); + Connection conn = DriverManager.getConnection("jdbc:derby://127.0.0.1:1527/sample;create=true"); + conn.close(); + + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + final DBCPConnectionPool service = new DBCPConnectionPool(); + runner.addControllerService("test-dropcreate", service); + + // set Derby database props + runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, "jdbc:derby:" + "//127.0.0.1:1527/sample"); + runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester"); + runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.ClientDriver"); + + runner.enableControllerService(service); + + runner.assertValid(service); + final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-dropcreate"); + Assert.assertNotNull(dbcpService); + + for (int i = 0; i < 10; i++) { + final Connection connection = dbcpService.getConnection(); + System.out.println(connection); + Assert.assertNotNull(connection); + assertValidConnectionDerby(connection, i); + connection.close(); + } + + serverControl.shutdown(); + dbLocation.delete(); + if (dbLocation.exists()) + throw new RuntimeException("Still exists " + dbLocation.getAbsolutePath()); + try { + serverControl.ping(); + } catch (Exception e) { + } + + Thread.sleep(2000); + + for (int i = 0; i < 10; i++) { + final Connection connection = dbcpService.getConnection(); + System.out.println(connection); + Assert.assertNotNull(connection); + assertValidConnectionDerby(connection, i); + connection.close(); + } + + } + + + private void assertValidConnectionDerby(Connection connection, int num) throws SQLException { + try (Statement statement = connection.createStatement()) { + ResultSet rs = statement.executeQuery("SELECT " + num + " FROM SYSIBM.SYSDUMMY1"); + assertTrue(rs.next()); + int value = rs.getInt(1); + assertEquals(num, value); + assertTrue(connection.isValid(20)); + } + } + /** * Test get database connection using Derby. Get many times, release immediately and getConnection should not fail. */