Merge branch 'NIFI-322-Db-Connection-Pooling' into develop

This commit is contained in:
Mark Payne 2015-05-20 09:54:16 -04:00
commit 281ebafc2c
19 changed files with 1163 additions and 0 deletions

View File

@ -851,3 +851,32 @@ For details see http://asm.ow2.org/asmdex-license.html
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
THE POSSIBILITY OF SUCH DAMAGE.
The binary distribution of this product bundles 'Hamcrest' which is available
under a BSD license. More details found here: http://hamcrest.org.
Copyright (c) 2000-2006, www.hamcrest.org
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this list of
conditions and the following disclaimer. Redistributions in binary form must reproduce
the above copyright notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
Neither the name of Hamcrest nor the names of its contributors may be used to endorse
or promote products derived from this software without specific prior written
permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
DAMAGE.

View File

@ -162,6 +162,12 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-kite-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-nar</artifactId>

View File

@ -0,0 +1,31 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-dbcp-service-api</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import java.sql.Connection;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.processor.exception.ProcessException;
/**
* Definition for Database Connection Pooling Service.
*
*/
@Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store"})
@CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage.")
public interface DBCPService extends ControllerService {
public Connection getConnection() throws ProcessException;
}

View File

@ -0,0 +1,37 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-bundle</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-dbcp-service-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,22 @@
nifi-dbcp-service-nar
Copyright 2014-2015 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
===========================================
Apache Software License v2
===========================================
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons DBCP
The following NOTICE information applies:
Apache Commons DBCP
Copyright 2001-2015 The Apache Software Foundation.
(ASLv2) Apache Derby
The following NOTICE information applies:
Apache Derby
Copyright 2004-2014 Apache, Apache DB, Apache Derby, Apache Torque, Apache JDO, Apache DDLUtils,
the Derby hat logo, the Apache JDO logo, and the Apache feather logo are trademarks of The Apache Software Foundation.

View File

@ -0,0 +1,65 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-bundle</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-dbcp-service</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.11.1.1</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,269 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
/**
* Implementation of for Database Connection Pooling Service.
* Apache DBCP is used for connection pooling functionality.
*
*/
@Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store"})
@CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage.")
public class DBCPConnectionPool extends AbstractControllerService implements DBCPService {
public static final DatabaseSystemDescriptor DEFAULT_DATABASE_SYSTEM = DatabaseSystems.getDescriptor("JavaDB");
public static final PropertyDescriptor DATABASE_SYSTEM = new PropertyDescriptor.Builder()
.name("Database Type")
.description("Database management system")
.allowableValues(DatabaseSystems.knownDatabaseSystems)
.defaultValue(DEFAULT_DATABASE_SYSTEM.getValue())
.required(true)
.build();
public static final PropertyDescriptor DB_HOST = new PropertyDescriptor.Builder()
.name("Database Host")
.description("Database Host")
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DB_PORT = new PropertyDescriptor.Builder()
.name("Database Port")
.description("Database server port")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder()
.name("Database Driver Class Name")
.description("Database driver class name")
.defaultValue(DEFAULT_DATABASE_SYSTEM.driverClassName)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DB_DRIVER_JAR_URL = new PropertyDescriptor.Builder()
.name("Database Driver Jar Url")
.description("Optional database driver jar file path url. For example 'file:///var/tmp/mariadb-java-client-1.1.7.jar'")
.defaultValue(null)
.required(false)
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
.name("Database Name")
.description("Database name")
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
.name("Database User")
.description("Database user name")
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("The password for the database user")
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
.name("Max Wait Time")
.description("The maximum amount of time that the pool will wait (when there are no available connections) "
+ " for a connection to be returned before failing, or -1 to wait indefinitely. ")
.defaultValue("500 millis")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.sensitive(false)
.build();
public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
.name("Max Total Connections")
.description("The maximum number of active connections that can be allocated from this pool at the same time, "
+ " or negative for no limit.")
.defaultValue("8")
.required(true)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.sensitive(false)
.build();
private static final List<PropertyDescriptor> properties;
static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(DATABASE_SYSTEM);
props.add(DB_HOST);
props.add(DB_PORT);
props.add(DB_DRIVERNAME);
props.add(DB_DRIVER_JAR_URL);
props.add(DB_NAME);
props.add(DB_USER);
props.add(DB_PASSWORD);
props.add(MAX_WAIT_TIME);
props.add(MAX_TOTAL_CONNECTIONS);
properties = Collections.unmodifiableList(props);
}
private volatile BasicDataSource dataSource;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
/**
* Create new pool, open some connections ready to be used
* @param context the configuration context
* @throws InitializationException if unable to create a database connection
*/
@OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException {
DatabaseSystemDescriptor dbsystem = DatabaseSystems.getDescriptor( context.getProperty(DATABASE_SYSTEM).getValue() );
String host = context.getProperty(DB_HOST).getValue();
Integer port = context.getProperty(DB_PORT).asInteger();
String drv = context.getProperty(DB_DRIVERNAME).getValue();
String dbname = context.getProperty(DB_NAME).getValue();
String user = context.getProperty(DB_USER).getValue();
String passw = context.getProperty(DB_PASSWORD).getValue();
Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger();
dataSource = new BasicDataSource();
dataSource.setDriverClassName(drv);
// Optional driver URL, when exist, this URL will be used to locate driver jar file location
String urlString = context.getProperty(DB_DRIVER_JAR_URL).getValue();
dataSource.setDriverClassLoader( getDriverClassLoader(urlString, drv) );
String dburl = dbsystem.buildUrl(host, port, dbname);
dataSource.setMaxWait(maxWaitMillis);
dataSource.setMaxActive(maxTotal);
dataSource.setUrl(dburl);
dataSource.setUsername(user);
dataSource.setPassword(passw);
// verify connection can be established.
try {
Connection con = dataSource.getConnection();
if (con==null) {
throw new InitializationException("Connection to database cannot be established.");
}
con.close();
} catch (SQLException e) {
throw new InitializationException(e);
}
}
/**
* using Thread.currentThread().getContextClassLoader();
* will ensure that you are using the ClassLoader for you NAR.
* @throws InitializationException if there is a problem obtaining the ClassLoader
*/
protected ClassLoader getDriverClassLoader(String urlString, String drvName) throws InitializationException {
if (urlString!=null && urlString.length()>0) {
try {
URL[] urls = new URL[] { new URL(urlString) };
URLClassLoader ucl = new URLClassLoader(urls);
// Workaround which allows to use URLClassLoader for JDBC driver loading.
// (Because the DriverManager will refuse to use a driver not loaded by the system ClassLoader.)
Class<?> clazz = Class.forName(drvName, true, ucl);
if (clazz==null) {
throw new InitializationException("Can't load Database Driver " + drvName);
}
Driver driver = (Driver) clazz.newInstance();
DriverManager.registerDriver( new DriverShim(driver) );
return ucl;
} catch (MalformedURLException e) {
throw new InitializationException("Invalid Database Driver Jar Url", e);
} catch (Exception e) {
throw new InitializationException("Can't load Database Driver", e);
}
} else {
// That will ensure that you are using the ClassLoader for you NAR.
return Thread.currentThread().getContextClassLoader();
}
}
/**
* Shutdown pool, close all open connections.
*/
@OnDisabled
public void shutdown() {
try {
dataSource.close();
} catch (SQLException e) {
throw new ProcessException(e);
}
}
@Override
public Connection getConnection() throws ProcessException {
try {
Connection con = dataSource.getConnection();
return con;
} catch (SQLException e) {
throw new ProcessException(e);
}
}
@Override
public String toString() {
return "DBCPConnectionPool[id=" + getIdentifier() + "]";
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import java.text.MessageFormat;
import org.apache.nifi.components.AllowableValue;
/**
* An immutable object for holding information about a database system.
*
*/
public class DatabaseSystemDescriptor extends AllowableValue {
public final String driverClassName;
public final Integer defaultPort;
public final String urlTemplate;
public final boolean internalDriverJar;
public DatabaseSystemDescriptor(String value, String description, String driverClassName, Integer defaultPort, String urlTemplate, boolean internalDriverJar) {
super(value, value, description);
if (defaultPort==null)
throw new IllegalArgumentException("defaultPort cannot be null");
this.driverClassName = driverClassName;
this.defaultPort = defaultPort;
this.urlTemplate = urlTemplate;
this.internalDriverJar = internalDriverJar;
}
public String buildUrl(String host, Integer port, String dbname) {
return MessageFormat.format(urlTemplate, host, port.toString(), dbname);
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
public class DatabaseSystems {
/**
* Currently contain only few known Database systems.
* Please help to expand this list.
*
* Please be ensure that all JDBC drivers are license-compatible with Apache.
* http://www.apache.org/legal/resolved.html
* If not include them in "JDBC driver jar must be loaded from external location" section
* and do not include actual driver in NiFi distribution (don't include driver in pom.xml file)
*
* {0} host name/ip
* {1} port number
* {2} database name
*
* for example url template
* "jdbc:postgresql://{0}:{1}/{2}"
* will be after building
* "jdbc:postgresql://bighost:5432/Trove"
*/
public static DatabaseSystemDescriptor[] knownDatabaseSystems = {
// ================= JDBC driver jar should be included in nar (in pom.xml dependencies) =======================
new DatabaseSystemDescriptor("Postgres", "PostgreSQL open soure object-relational database.",
"org.postgresql.Driver", 5432, "jdbc:postgresql://{0}:{1}/{2}", true),
new DatabaseSystemDescriptor("JavaDB", "Java DB is Oracle's supported distribution of the Apache Derby open source database. Included in JDK.",
"org.apache.derby.jdbc.EmbeddedDriver", 1, "jdbc:derby:{2};create=true", true),
new DatabaseSystemDescriptor("Derby", "Apache Derby is an open source relational database.",
"org.apache.derby.jdbc.EmbeddedDriver", 1, "jdbc:derby:{2};create=true", true),
// ================= JDBC driver jar must be loaded from external location =======================
// Such drivers cannot be included in NiFi distribution because are not license-compatible with Apache.
new DatabaseSystemDescriptor("MariaDB",
"MariaDB is a community-developed fork of the MySQL relational database management system intended to remain free under the GNU GPL.",
"org.mariadb.jdbc.Driver", 3306, "jdbc:mariadb://{0}:{1}/{2}", false),
new DatabaseSystemDescriptor("Oracle",
"Oracle Database is an object-relational database management system.",
"oracle.jdbc.OracleDriver", 1521, "jdbc:oracle:thin:@//{0}:{1}/{2}", false),
new DatabaseSystemDescriptor("Sybase",
"Sybase is an relational database management system.",
"com.sybase.jdbc3.jdbc.SybDriver", 5000, "jdbc:sybase:Tds:{0}:{1}/{2}", false),
// ================= Unknown JDBC driver, user must provide connection details =====================
new DatabaseSystemDescriptor("Other DB", "Other JDBC compliant JDBC driver",
null, 1, null, false),
};
public static DatabaseSystemDescriptor getDescriptor(String name) {
for ( DatabaseSystemDescriptor descr : DatabaseSystems.knownDatabaseSystems) {
if (descr.getValue().equalsIgnoreCase(name))
return descr;
}
throw new IllegalArgumentException("Can't find DatabaseSystemDescriptor by name " + name);
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverPropertyInfo;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.Properties;
import java.util.logging.Logger;
/**
* Workaround which allows to use URLClassLoader for JDBC driver loading.
* (Because the DriverManager will refuse to use a driver not loaded by the system ClassLoader.)
*
*/
class DriverShim implements Driver {
private Driver driver;
DriverShim(Driver d) {
this.driver = d;
}
@Override
public boolean acceptsURL(String u) throws SQLException {
return this.driver.acceptsURL(u);
}
@Override
public Connection connect(String u, Properties p) throws SQLException {
return this.driver.connect(u, p);
}
@Override
public int getMajorVersion() {
return this.driver.getMajorVersion();
}
@Override
public int getMinorVersion() {
return this.driver.getMinorVersion();
}
@Override
public DriverPropertyInfo[] getPropertyInfo(String u, Properties p) throws SQLException {
return this.driver.getPropertyInfo(u, p);
}
@Override
public boolean jdbcCompliant() {
return this.driver.jdbcCompliant();
}
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return driver.getParentLogger();
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.dbcp.DBCPConnectionPool

View File

@ -0,0 +1,331 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import static org.apache.nifi.dbcp.DatabaseSystems.getDescriptor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class DBCPServiceTest {
final static String DB_LOCATION = "target/db";
@BeforeClass
public static void setup() {
System.setProperty("derby.stream.error.file", "target/derby.log");
}
/**
* Unknown database system.
*
*/
@Test
public void testUnknownDatabaseSystem() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final DBCPConnectionPool service = new DBCPConnectionPool();
final Map<String, String> properties = new HashMap<String, String>();
properties.put(DBCPConnectionPool.DATABASE_SYSTEM.getName(), "garbage");
runner.addControllerService("test-bad2", service, properties);
runner.assertNotValid(service);
}
/**
* Missing property values.
*/
@Test
public void testMissingPropertyValues() throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final DBCPConnectionPool service = new DBCPConnectionPool();
final Map<String, String> properties = new HashMap<String, String>();
runner.addControllerService("test-bad1", service, properties);
runner.assertNotValid(service);
}
/**
* Test database connection using Derby.
* Connect, create table, insert, select, drop table.
*
*/
@Test
public void testCreateInsertSelect() throws InitializationException, SQLException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final DBCPConnectionPool service = new DBCPConnectionPool();
runner.addControllerService("test-good1", service);
// remove previous test database, if any
File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
// Should setProperty call also generate DBCPConnectionPool.onPropertyModified() method call?
// It does not currently.
// Some properties already should have JavaDB/Derby default values, let's set only missing values.
runner.setProperty(service, DBCPConnectionPool.DB_HOST, "NA"); // Embedded Derby don't use host
runner.setProperty(service, DBCPConnectionPool.DB_PORT, "1"); // Embedded Derby don't use port, but must have value anyway
runner.setProperty(service, DBCPConnectionPool.DB_NAME, DB_LOCATION);
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.enableControllerService(service);
runner.assertValid(service);
DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-good1");
Assert.assertNotNull(dbcpService);
Connection connection = dbcpService.getConnection();
Assert.assertNotNull(connection);
createInsertSelectDrop(connection);
connection.close(); // return to pool
}
/**
* NB!!!!
* Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar
* Prerequisite: access to running MariaDb database server
*
* Test database connection using external JDBC jar located by URL.
* Connect, create table, insert, select, drop table.
*
*/
@Ignore
@Test
public void testExternalJDBCDriverUsage() throws InitializationException, SQLException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final DBCPConnectionPool service = new DBCPConnectionPool();
runner.addControllerService("test-external-jar", service);
DatabaseSystemDescriptor mariaDb = getDescriptor("MariaDB");
assertNotNull(mariaDb);
// Set MariaDB properties values.
runner.setProperty(service, DBCPConnectionPool.DATABASE_SYSTEM, mariaDb.getValue());
runner.setProperty(service, DBCPConnectionPool.DB_PORT, mariaDb.defaultPort.toString());
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, mariaDb.driverClassName);
runner.setProperty(service, DBCPConnectionPool.DB_DRIVER_JAR_URL, "file:///var/tmp/mariadb-java-client-1.1.7.jar");
runner.setProperty(service, DBCPConnectionPool.DB_HOST, "localhost"); // localhost
runner.setProperty(service, DBCPConnectionPool.DB_NAME, "testdb");
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.enableControllerService(service);
runner.assertValid(service);
DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-external-jar");
Assert.assertNotNull(dbcpService);
Connection connection = dbcpService.getConnection();
Assert.assertNotNull(connection);
createInsertSelectDrop(connection);
connection.close(); // return to pool
}
@Rule
public ExpectedException exception = ExpectedException.none();
/**
* Test get database connection using Derby.
* Get many times, after a while pool should not contain any available connection
* and getConnection should fail.
*/
@Test
public void testExhaustPool() throws InitializationException, SQLException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final DBCPConnectionPool service = new DBCPConnectionPool();
runner.addControllerService("test-exhaust", service);
// remove previous test database, if any
File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
runner.setProperty(service, DBCPConnectionPool.DB_HOST, "NA"); // Embedded Derby don't use host
runner.setProperty(service, DBCPConnectionPool.DB_PORT, "1"); // Embedded Derby don't use port, but must have value anyway
runner.setProperty(service, DBCPConnectionPool.DB_NAME, DB_LOCATION);
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.enableControllerService(service);
runner.assertValid(service);
DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust");
Assert.assertNotNull(dbcpService);
exception.expect(ProcessException.class);
exception.expectMessage("Cannot get a connection, pool error Timeout waiting for idle object");
for (int i = 0; i < 100; i++) {
Connection connection = dbcpService.getConnection();
Assert.assertNotNull(connection);
}
}
/**
* Test get database connection using Derby.
* Get many times, release immediately
* and getConnection should not fail.
*/
@Test
public void testGetManyNormal() throws InitializationException, SQLException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final DBCPConnectionPool service = new DBCPConnectionPool();
runner.addControllerService("test-exhaust", service);
// remove previous test database, if any
File dbLocation = new File(DB_LOCATION);
dbLocation.delete();
runner.setProperty(service, DBCPConnectionPool.DB_HOST, "NA"); // Embedded Derby don't use host
runner.setProperty(service, DBCPConnectionPool.DB_PORT, "1"); // Embedded Derby don't use port, but must have value anyway
runner.setProperty(service, DBCPConnectionPool.DB_NAME, DB_LOCATION);
runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
runner.setProperty(service, DBCPConnectionPool.DB_PASSWORD, "testerp");
runner.enableControllerService(service);
runner.assertValid(service);
DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust");
Assert.assertNotNull(dbcpService);
for (int i = 0; i < 1000; i++) {
Connection connection = dbcpService.getConnection();
Assert.assertNotNull(connection);
connection.close(); // will return connection to pool
}
}
@Test
public void testDriverLoad() throws ClassNotFoundException {
Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
assertNotNull(clazz);
}
/**
* NB!!!!
* Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar
*/
@Test
@Ignore("Intended only for local testing, not automated testing")
public void testURLClassLoader() throws ClassNotFoundException, MalformedURLException, SQLException, InstantiationException, IllegalAccessException {
URL url = new URL("file:///var/tmp/mariadb-java-client-1.1.7.jar");
URL[] urls = new URL[] { url };
ClassLoader parent = Thread.currentThread().getContextClassLoader();
URLClassLoader ucl = new URLClassLoader(urls,parent);
Class<?> clazz = Class.forName("org.mariadb.jdbc.Driver", true, ucl);
assertNotNull(clazz);
Driver driver = (Driver) clazz.newInstance();
Driver shim = new DriverShim(driver);
DriverManager.registerDriver( shim );
Driver driver2 = DriverManager.getDriver("jdbc:mariadb://localhost:3306/testdb");
assertNotNull(driver2);
}
/**
* NB!!!!
* Prerequisite: file should be present in /var/tmp/mariadb-java-client-1.1.7.jar
* Prerequisite: access to running MariaDb database server
*/
@Test
@Ignore("Intended only for local testing, not automated testing")
public void testURLClassLoaderGetConnection() throws ClassNotFoundException, MalformedURLException, SQLException, InstantiationException, IllegalAccessException {
URL url = new URL("file:///var/tmp/mariadb-java-client-1.1.7.jar");
URL[] urls = new URL[] { url };
ClassLoader parent = Thread.currentThread().getContextClassLoader();
URLClassLoader ucl = new URLClassLoader(urls,parent);
Class<?> clazz = Class.forName("org.mariadb.jdbc.Driver", true, ucl);
assertNotNull(clazz);
Driver driver = (Driver) clazz.newInstance();
Driver shim = new DriverShim(driver);
DriverManager.registerDriver( shim );
Driver driver2 = DriverManager.getDriver("jdbc:mariadb://localhost:3306/testdb");
assertNotNull(driver2);
Connection connection = DriverManager.getConnection("jdbc:mariadb://localhost:3306/testdb","tester","testerp");
assertNotNull(connection);
connection.close();
DriverManager.deregisterDriver(shim);
}
String createTable = "create table restaurants(id integer, name varchar(20), city varchar(50))";
String dropTable = "drop table restaurants";
protected void createInsertSelectDrop( Connection con) throws SQLException {
Statement st = con.createStatement();
try {
st.executeUpdate(dropTable);
} catch (Exception e) {
// table may not exist, this is not serious problem.
}
st.executeUpdate(createTable);
st.executeUpdate("insert into restaurants values (1, 'Irifunes', 'San Mateo')");
st.executeUpdate("insert into restaurants values (2, 'Estradas', 'Daly City')");
st.executeUpdate("insert into restaurants values (3, 'Prime Rib House', 'San Francisco')");
int nrOfRows = 0;
ResultSet resultSet = st.executeQuery("select * from restaurants");
while (resultSet.next())
nrOfRows++;
assertEquals(3, nrOfRows);
st.close();
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import static org.apache.nifi.dbcp.DatabaseSystems.getDescriptor;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
public class TestDatabaseSystems {
@Test
public void testKnownDatabaseSystems() {
assertEquals("jdbc:postgresql://bighost:5432/Trove", getDescriptor("Postgres").buildUrl("bighost",5432,"Trove"));
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
public class TestProcessor extends AbstractProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> propDescs = new ArrayList<>();
propDescs.add(new PropertyDescriptor.Builder()
.name("DBCPService test processor")
.description("DBCPService test processor")
.identifiesControllerService(DBCPService.class)
.required(true)
.build());
return propDescs;
}
}

View File

@ -0,0 +1,29 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-dbcp-service-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-dbcp-service</module>
<module>nifi-dbcp-service-nar</module>
</modules>
</project>

View File

@ -42,5 +42,11 @@
<artifactId>nifi-http-context-map-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -31,5 +31,7 @@
<module>nifi-ssl-context-service-api</module>
<module>nifi-http-context-map-bundle</module>
<module>nifi-standard-services-api-nar</module>
<module>nifi-dbcp-service-api</module>
<module>nifi-dbcp-service-bundle</module>
</modules>
</project>

View File

@ -832,6 +832,11 @@
<artifactId>nifi-write-ahead-log</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service</artifactId>
<version>0.1.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>