mirror of https://github.com/apache/nifi.git
Initial Database connection pooling service implementation NIFI-322
Signed-off-by: Toivo Adams <toivo.adams@gmail.com> Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
parent
5273a63bf1
commit
589e2b7ebf
|
@ -0,0 +1,18 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-services</artifactId>
|
||||
<version>0.0.2-incubating-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-dbcp-service-api</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.dbcp;
|
||||
|
||||
import java.sql.Connection;
|
||||
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
|
||||
/**
|
||||
* Definition for Database Connection Pooling Service.
|
||||
*
|
||||
*/
|
||||
@Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store"})
|
||||
@CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage."
|
||||
)
|
||||
public interface DBCPService extends ControllerService {
|
||||
|
||||
|
||||
public Connection getConnection() throws ProcessException;
|
||||
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-dbcp-service-bundle</artifactId>
|
||||
<version>0.0.2-incubating-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-dbcp-service-nar</artifactId>
|
||||
<packaging>nar</packaging>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-services-api-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-dbcp-service</artifactId>
|
||||
<version>0.0.2-incubating-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
||||
|
||||
</project>
|
|
@ -0,0 +1,49 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-dbcp-service-bundle</artifactId>
|
||||
<version>0.0.2-incubating-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-dbcp-service</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-dbcp-service-api</artifactId>
|
||||
<version>0.0.2-incubating-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-processor-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-security-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-dbcp</groupId>
|
||||
<artifactId>commons-dbcp</artifactId>
|
||||
<version>1.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.derby</groupId>
|
||||
<artifactId>derby</artifactId>
|
||||
<version>10.11.1.1</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,208 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.dbcp;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.dbcp.BasicDataSource;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
|
||||
/**
|
||||
* Implementation of for Database Connection Pooling Service.
|
||||
* Apache DBCP is used for connection pooling functionality.
|
||||
*
|
||||
*/
|
||||
@Tags({"dbcp", "jdbc", "database", "connection", "pooling", "store"})
|
||||
@CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage."
|
||||
)
|
||||
public class DBCPServiceApacheDBCP14 extends AbstractControllerService implements DBCPService {
|
||||
|
||||
public static final DatabaseSystemDescriptor DEFAULT_DATABASE_SYSTEM = DatabaseSystems.getDescriptor("JavaDB");
|
||||
|
||||
public static final PropertyDescriptor DATABASE_SYSTEM = new PropertyDescriptor.Builder()
|
||||
.name("Database")
|
||||
.description("Database management system")
|
||||
// .allowableValues(POSTGRES, JavaDB, DERBY, MariaDB, OtherDB)
|
||||
.allowableValues(DatabaseSystems.knownDatabaseSystems)
|
||||
.defaultValue(DEFAULT_DATABASE_SYSTEM.getValue())
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_HOST = new PropertyDescriptor.Builder()
|
||||
.name("Database host")
|
||||
.description("Database host")
|
||||
.defaultValue(null)
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_PORT = new PropertyDescriptor.Builder()
|
||||
.name("Database port")
|
||||
.description("Database server port")
|
||||
.defaultValue(DEFAULT_DATABASE_SYSTEM.defaultPort.toString())
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.PORT_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder()
|
||||
.name("Database driver class name")
|
||||
.description("Database driver class name")
|
||||
.defaultValue(DEFAULT_DATABASE_SYSTEM.driverClassName)
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_NAME = new PropertyDescriptor.Builder()
|
||||
.name("Database name")
|
||||
.description("Database name")
|
||||
.defaultValue(null)
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
|
||||
.name("Database user")
|
||||
.description("Database user name")
|
||||
.defaultValue(null)
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("Password")
|
||||
.description("The password for the database user")
|
||||
.defaultValue(null)
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
|
||||
private static final List<PropertyDescriptor> properties;
|
||||
|
||||
static {
|
||||
List<PropertyDescriptor> props = new ArrayList<>();
|
||||
props.add(DATABASE_SYSTEM);
|
||||
props.add(DB_HOST);
|
||||
props.add(DB_PORT);
|
||||
props.add(DB_DRIVERNAME);
|
||||
props.add(DB_NAME);
|
||||
props.add(DB_USER);
|
||||
props.add(DB_PASSWORD);
|
||||
|
||||
properties = Collections.unmodifiableList(props);
|
||||
}
|
||||
|
||||
private ConfigurationContext configContext;
|
||||
private volatile BasicDataSource dataSource;
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
//================================= Apache DBCP pool parameters ================================
|
||||
|
||||
/** The maximum number of milliseconds that the pool will wait (when there are no available connections)
|
||||
* for a connection to be returned before throwing an exception, or -1 to wait indefinitely.
|
||||
*/
|
||||
static final long maxWaitMillis = 500;
|
||||
|
||||
/** The maximum number of active connections that can be allocated from this pool at the same time,
|
||||
* or negative for no limit.
|
||||
*/
|
||||
static final int maxTotal = 8;
|
||||
|
||||
//=================================================================================================
|
||||
|
||||
/**
|
||||
* Idea was to dynamically set port, driver and url properties default values after user select database system.
|
||||
* As of 01mar2015 such functionality is not supported.
|
||||
*
|
||||
@Override
|
||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||
super.onPropertyModified(descriptor, oldValue, newValue);
|
||||
|
||||
if (descriptor.equals(DATABASE_SYSTEM)) {
|
||||
|
||||
DatabaseSystemDescriptor databaseSystemDescriptor = DatabaseSystems.getDescriptor(newValue);
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
@OnEnabled
|
||||
public void onConfigured(final ConfigurationContext context) throws InitializationException {
|
||||
configContext = context;
|
||||
|
||||
DatabaseSystemDescriptor dbsystem = DatabaseSystems.getDescriptor( context.getProperty(DATABASE_SYSTEM).getValue() );
|
||||
|
||||
String host = context.getProperty(DB_HOST).getValue();
|
||||
Integer port = context.getProperty(DB_PORT).asInteger();
|
||||
String drv = context.getProperty(DB_DRIVERNAME).getValue();
|
||||
String dbname = context.getProperty(DB_NAME).getValue();
|
||||
String user = context.getProperty(DB_USER).getValue();
|
||||
String passw = context.getProperty(DB_PASSWORD).getValue();
|
||||
|
||||
String dburl = dbsystem.buildUrl(host, port, dbname);
|
||||
|
||||
dataSource = new BasicDataSource();
|
||||
dataSource.setMaxWait(maxWaitMillis);
|
||||
dataSource.setMaxActive(maxTotal);
|
||||
|
||||
dataSource.setUrl(dburl);
|
||||
dataSource.setDriverClassName(drv);
|
||||
dataSource.setUsername(user);
|
||||
dataSource.setPassword(passw);
|
||||
|
||||
// verify connection can be established.
|
||||
try {
|
||||
Connection con = dataSource.getConnection();
|
||||
if (con==null)
|
||||
throw new InitializationException("Connection to database cannot be established.");
|
||||
con.close();
|
||||
} catch (SQLException e) {
|
||||
throw new InitializationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection getConnection() throws ProcessException {
|
||||
try {
|
||||
Connection con = dataSource.getConnection();
|
||||
return con;
|
||||
} catch (SQLException e) {
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DBCPServiceApacheDBCP14[id=" + getIdentifier() + "]";
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.dbcp;
|
||||
|
||||
import java.text.MessageFormat;
|
||||
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
|
||||
/**
|
||||
* An immutable object for holding information about a database system.
|
||||
*
|
||||
*/
|
||||
public class DatabaseSystemDescriptor extends AllowableValue {
|
||||
|
||||
public final String driverClassName;
|
||||
public final Integer defaultPort;
|
||||
public final String urlTemplate;
|
||||
public final boolean internalDriverJar;
|
||||
|
||||
public DatabaseSystemDescriptor(String value, String description,
|
||||
String driverClassName, Integer defaultPort, String urlTemplate, boolean internalDriverJar) {
|
||||
super(value, value, description);
|
||||
|
||||
if (defaultPort==null)
|
||||
throw new IllegalArgumentException("defaultPort cannot be null");
|
||||
|
||||
this.driverClassName = driverClassName;
|
||||
this.defaultPort = defaultPort;
|
||||
this.urlTemplate = urlTemplate;
|
||||
this.internalDriverJar = internalDriverJar;
|
||||
}
|
||||
|
||||
public String buildUrl(String host, Integer port, String dbname) {
|
||||
return MessageFormat.format(urlTemplate, host, port.toString(), dbname);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.dbcp;
|
||||
|
||||
public class DatabaseSystems {
|
||||
|
||||
/**
|
||||
* {0} host name/ip
|
||||
* {1} port number
|
||||
* {2} database name
|
||||
*
|
||||
* for example url template
|
||||
* "jdbc:postgresql://{0}:{1}/{2}"
|
||||
* will be after building
|
||||
* "jdbc:postgresql://bighost:5432/Trove"
|
||||
*
|
||||
*/
|
||||
|
||||
public static DatabaseSystemDescriptor[] knownDatabaseSystems = {
|
||||
|
||||
// ================= JDBC driver jar should be included in nar (in pom.xml dependencies) =======================
|
||||
|
||||
new DatabaseSystemDescriptor("Postgres", "PostgreSQL open soure object-relational database.",
|
||||
"org.postgresql.Driver", 5432, "jdbc:postgresql://{0}:{1}/{2}", true),
|
||||
|
||||
new DatabaseSystemDescriptor("JavaDB", "Java DB is Oracle's supported distribution of the Apache Derby open source database. Included in JDK.",
|
||||
"org.apache.derby.jdbc.EmbeddedDriver", 1, "jdbc:derby:{2};create=true", true),
|
||||
|
||||
new DatabaseSystemDescriptor("Derby", "Apache Derby is an open source relational database.",
|
||||
"org.apache.derby.jdbc.EmbeddedDriver", 1, "jdbc:derby:{2};create=true", true),
|
||||
|
||||
|
||||
// ================= JDBC driver jar must be loaded from external location =======================
|
||||
|
||||
new DatabaseSystemDescriptor("MariaDB",
|
||||
"MariaDB is a community-developed fork of the MySQL relational database management system intended to remain free under the GNU GPL.",
|
||||
"org.mariadb.jdbc.Driver", 3306, "jdbc:mariadb://{0}:{1}/{2}", false),
|
||||
|
||||
new DatabaseSystemDescriptor("Oracle",
|
||||
"Oracle Database is an object-relational database management system.",
|
||||
"oracle.jdbc.OracleDriver", 1521, "jdbc:oracle:thin:@//{0}:{1}/{2}", false),
|
||||
|
||||
new DatabaseSystemDescriptor("Sybase",
|
||||
"Sybase is an relational database management system.",
|
||||
"com.sybase.jdbc3.jdbc.SybDriver", 5000, "jdbc:sybase:Tds:{0}:{1}/{2}", false),
|
||||
|
||||
|
||||
// ================= Unknown JDBC driver, user must provide connection details =====================
|
||||
|
||||
new DatabaseSystemDescriptor("Other DB", "Other JDBC compliant JDBC driver",
|
||||
null, 1, null, false),
|
||||
|
||||
};
|
||||
|
||||
public static DatabaseSystemDescriptor getDescriptor(String name) {
|
||||
for ( DatabaseSystemDescriptor descr : DatabaseSystems.knownDatabaseSystems) {
|
||||
if (descr.getValue().equalsIgnoreCase(name))
|
||||
return descr;
|
||||
}
|
||||
throw new IllegalArgumentException("Can't find DatabaseSystemDescriptor by name " + name);
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,214 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.dbcp;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.File;
|
||||
import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.activation.DataSource;
|
||||
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
public class DBCPServiceTest {
|
||||
|
||||
final static String DB_LOCATION = "/var/tmp/testdb";
|
||||
|
||||
/**
|
||||
* Unknown database system.
|
||||
*
|
||||
*/
|
||||
@Test
|
||||
public void testBad1() throws InitializationException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
final DBCPServiceApacheDBCP14 service = new DBCPServiceApacheDBCP14();
|
||||
final Map<String, String> properties = new HashMap<String, String>();
|
||||
properties.put(DBCPServiceApacheDBCP14.DATABASE_SYSTEM.getName(), "garbage");
|
||||
runner.addControllerService("test-bad2", service, properties);
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
/**
|
||||
* Missing property values.
|
||||
*/
|
||||
@Test
|
||||
public void testGood1() throws InitializationException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
final DBCPServiceApacheDBCP14 service = new DBCPServiceApacheDBCP14();
|
||||
final Map<String, String> properties = new HashMap<String, String>();
|
||||
runner.addControllerService("test-bad1", service, properties);
|
||||
runner.assertNotValid(service);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test database connection using Derby.
|
||||
* Connect, create table, insert, select, drop table.
|
||||
*
|
||||
*/
|
||||
@Test
|
||||
public void testGood2() throws InitializationException, SQLException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
final DBCPServiceApacheDBCP14 service = new DBCPServiceApacheDBCP14();
|
||||
runner.addControllerService("test-good1", service);
|
||||
|
||||
// remove previous test database, if any
|
||||
File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
// Should setProperty call also generate DBCPServiceApacheDBCP14.onPropertyModified() method call?
|
||||
// It does not currently.
|
||||
|
||||
// Some properties already should have JavaDB/Derby default values, let's set only missing values.
|
||||
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_HOST, "NA"); // Embedded Derby don't use host
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_NAME, DB_LOCATION);
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_USER, "tester");
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_PASSWORD, "testerp");
|
||||
|
||||
runner.enableControllerService(service);
|
||||
|
||||
runner.assertValid(service);
|
||||
DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-good1");
|
||||
Assert.assertNotNull(dbcpService);
|
||||
Connection connection = dbcpService.getConnection();
|
||||
Assert.assertNotNull(connection);
|
||||
|
||||
createInsertSelectDrop(connection);
|
||||
|
||||
connection.close(); // return to pool
|
||||
}
|
||||
|
||||
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
/**
|
||||
* Test get database connection using Derby.
|
||||
* Get many times, after a while pool should not contain any available connection
|
||||
* and getConnection should fail.
|
||||
*/
|
||||
@Test
|
||||
public void testExhaustPool() throws InitializationException, SQLException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
final DBCPServiceApacheDBCP14 service = new DBCPServiceApacheDBCP14();
|
||||
runner.addControllerService("test-exhaust", service);
|
||||
|
||||
// remove previous test database, if any
|
||||
File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_HOST, "NA"); // Embedded Derby don't use host
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_NAME, DB_LOCATION);
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_USER, "tester");
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_PASSWORD, "testerp");
|
||||
|
||||
runner.enableControllerService(service);
|
||||
|
||||
runner.assertValid(service);
|
||||
DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust");
|
||||
Assert.assertNotNull(dbcpService);
|
||||
|
||||
exception.expect(ProcessException.class);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
Connection connection = dbcpService.getConnection();
|
||||
Assert.assertNotNull(connection);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test get database connection using Derby.
|
||||
* Get many times, release immediately
|
||||
* and getConnection should not fail.
|
||||
*/
|
||||
@Test
|
||||
public void testGetMany() throws InitializationException, SQLException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
final DBCPServiceApacheDBCP14 service = new DBCPServiceApacheDBCP14();
|
||||
runner.addControllerService("test-exhaust", service);
|
||||
|
||||
// remove previous test database, if any
|
||||
File dbLocation = new File(DB_LOCATION);
|
||||
dbLocation.delete();
|
||||
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_HOST, "NA"); // Embedded Derby don't use host
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_NAME, DB_LOCATION);
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_USER, "tester");
|
||||
runner.setProperty(service, DBCPServiceApacheDBCP14.DB_PASSWORD, "testerp");
|
||||
|
||||
runner.enableControllerService(service);
|
||||
|
||||
runner.assertValid(service);
|
||||
DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService("test-exhaust");
|
||||
Assert.assertNotNull(dbcpService);
|
||||
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
Connection connection = dbcpService.getConnection();
|
||||
Assert.assertNotNull(connection);
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testDriverLaod() throws ClassNotFoundException {
|
||||
Class<?> clazz = Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
|
||||
assertNotNull(clazz);
|
||||
}
|
||||
|
||||
|
||||
String createTable = "create table restaurants(id integer, name varchar(20), city varchar(50))";
|
||||
String dropTable = "drop table restaurants";
|
||||
|
||||
protected void createInsertSelectDrop( Connection con) throws SQLException {
|
||||
|
||||
Statement st = con.createStatement();
|
||||
|
||||
try {
|
||||
st.executeUpdate(dropTable);
|
||||
} catch (Exception e) {
|
||||
// table may not exist, this is not serious problem.
|
||||
}
|
||||
|
||||
st.executeUpdate(createTable);
|
||||
|
||||
st.executeUpdate("insert into restaurants values (1, 'Irifunes', 'San Mateo')");
|
||||
st.executeUpdate("insert into restaurants values (2, 'Estradas', 'Daly City')");
|
||||
st.executeUpdate("insert into restaurants values (3, 'Prime Rib House', 'San Francisco')");
|
||||
|
||||
int nrOfRows = 0;
|
||||
ResultSet resultSet = st.executeQuery("select * from restaurants");
|
||||
while (resultSet.next())
|
||||
nrOfRows++;
|
||||
assertEquals(3, nrOfRows);
|
||||
|
||||
st.close();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.dbcp;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.junit.Test;
|
||||
import static org.apache.nifi.dbcp.DatabaseSystems.getDescriptor;
|
||||
|
||||
public class TestDatabaseSystems {
|
||||
|
||||
@Test
|
||||
public void testKnownDatabaseSystems() {
|
||||
|
||||
assertEquals( "jdbc:postgresql://bighost:5432/Trove", getDescriptor("Postgres").buildUrl("bighost",5432,"Trove") );
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.dbcp;
|
||||
|
||||
import org.apache.nifi.dbcp.DBCPService;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
public class TestProcessor extends AbstractProcessor {
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
List<PropertyDescriptor> propDescs = new ArrayList<>();
|
||||
propDescs.add(new PropertyDescriptor.Builder()
|
||||
.name("DBCPService test processor")
|
||||
.description("DBCPService test processor")
|
||||
.addValidator(StandardValidators.createControllerServiceExistsValidator(DBCPService.class))
|
||||
.required(true)
|
||||
.build());
|
||||
return propDescs;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-services</artifactId>
|
||||
<version>0.0.2-incubating-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-dbcp-service-bundle</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<modules>
|
||||
<module>nifi-dbcp-service</module>
|
||||
<module>nifi-dbcp-service-nar</module>
|
||||
</modules>
|
||||
|
||||
|
||||
</project>
|
|
@ -42,5 +42,11 @@
|
|||
<artifactId>nifi-http-context-map-api</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-dbcp-service-api</artifactId>
|
||||
<version>0.1.0-incubating-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -31,5 +31,6 @@
|
|||
<module>nifi-ssl-context-service-api</module>
|
||||
<module>nifi-http-context-map-bundle</module>
|
||||
<module>nifi-standard-services-api-nar</module>
|
||||
<module>nifi-dbcp-service-api</module>
|
||||
</modules>
|
||||
</project>
|
||||
|
|
Loading…
Reference in New Issue