mirror of https://github.com/apache/nifi.git
NIFI-6871: Added HikariCPConnectionPool controller service
This closes #3890 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
a3fb61d9e6
commit
b8af44d81b
|
@ -38,5 +38,10 @@
|
||||||
<artifactId>nifi-dbcp-service</artifactId>
|
<artifactId>nifi-dbcp-service</artifactId>
|
||||||
<version>1.16.0-SNAPSHOT</version>
|
<version>1.16.0-SNAPSHOT</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-hikari-dbcp-service</artifactId>
|
||||||
|
<version>1.16.0-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -14,4 +14,4 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
org.apache.nifi.dbcp.DBCPConnectionPool
|
org.apache.nifi.dbcp.DBCPConnectionPool
|
||||||
org.apache.nifi.dbcp.DBCPConnectionPoolLookup
|
org.apache.nifi.dbcp.DBCPConnectionPoolLookup
|
||||||
org.apache.nifi.record.sink.db.DatabaseRecordSink
|
org.apache.nifi.record.sink.db.DatabaseRecordSink
|
||||||
|
|
|
@ -0,0 +1,109 @@
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-dbcp-service-bundle</artifactId>
|
||||||
|
<version>1.16.0-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<artifactId>nifi-hikari-dbcp-service</artifactId>
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
<properties>
|
||||||
|
<derby.version>10.14.2.0</derby.version>
|
||||||
|
</properties>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-dbcp-service-api</artifactId>
|
||||||
|
<version>1.16.0-SNAPSHOT</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-utils</artifactId>
|
||||||
|
<version>1.16.0-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-service-utils</artifactId>
|
||||||
|
<version>1.16.0-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-security-kerberos</artifactId>
|
||||||
|
<version>1.16.0-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-kerberos-user-service-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.zaxxer</groupId>
|
||||||
|
<artifactId>HikariCP</artifactId>
|
||||||
|
<version>4.0.3</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-mock</artifactId>
|
||||||
|
<version>1.16.0-SNAPSHOT</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>jcl-over-slf4j</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.derby</groupId>
|
||||||
|
<artifactId>derby</artifactId>
|
||||||
|
<version>${derby.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.derby</groupId>
|
||||||
|
<artifactId>derbynet</artifactId>
|
||||||
|
<version>${derby.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.derby</groupId>
|
||||||
|
<artifactId>derbytools</artifactId>
|
||||||
|
<version>${derby.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.derby</groupId>
|
||||||
|
<artifactId>derbyclient</artifactId>
|
||||||
|
<version>${derby.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.rat</groupId>
|
||||||
|
<artifactId>apache-rat-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<excludes combine.children="append">
|
||||||
|
<exclude>src/test/resources/fake.keytab</exclude>
|
||||||
|
</excludes>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
</project>
|
|
@ -0,0 +1,362 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.dbcp;
|
||||||
|
|
||||||
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
|
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.PropertyValue;
|
||||||
|
import org.apache.nifi.components.resource.ResourceCardinality;
|
||||||
|
import org.apache.nifi.components.resource.ResourceType;
|
||||||
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
|
import org.apache.nifi.expression.AttributeExpression;
|
||||||
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
|
import org.apache.nifi.kerberos.KerberosUserService;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.security.krb.KerberosAction;
|
||||||
|
import org.apache.nifi.security.krb.KerberosUser;
|
||||||
|
|
||||||
|
import javax.security.auth.login.LoginException;
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation of Database Connection Pooling Service. HikariCP is used for connection pooling functionality.
|
||||||
|
*/
|
||||||
|
@RequiresInstanceClassLoading
|
||||||
|
@Tags({"dbcp", "hikari", "jdbc", "database", "connection", "pooling", "store"})
|
||||||
|
@CapabilityDescription("Provides Database Connection Pooling Service based on HikariCP. Connections can be asked from pool and returned after usage.")
|
||||||
|
@DynamicProperty(name = "JDBC property name", value = "JDBC property value", expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
|
||||||
|
description = "Specifies a property name and value to be set on the JDBC connection(s). "
|
||||||
|
+ "If Expression Language is used, evaluation will be performed upon the controller service being enabled. "
|
||||||
|
+ "Note that no flow file input (attributes, e.g.) is available for use in Expression Language constructs for these properties.")
|
||||||
|
public class HikariCPConnectionPool extends AbstractControllerService implements DBCPService {
|
||||||
|
/**
|
||||||
|
* Property Name Prefix for Sensitive Dynamic Properties
|
||||||
|
*/
|
||||||
|
protected static final String SENSITIVE_PROPERTY_PREFIX = "SENSITIVE.";
|
||||||
|
protected static final long INFINITE_MILLISECONDS = -1L;
|
||||||
|
|
||||||
|
private static final String DEFAULT_TOTAL_CONNECTIONS = "10";
|
||||||
|
private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
|
||||||
|
|
||||||
|
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
|
||||||
|
.name("hikaricp-connection-url")
|
||||||
|
.displayName("Database Connection URL")
|
||||||
|
.description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters."
|
||||||
|
+ " The exact syntax of a database connection URL is specified by your DBMS.")
|
||||||
|
.defaultValue(null)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.required(true)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder()
|
||||||
|
.name("hikaricp-driver-classname")
|
||||||
|
.displayName("Database Driver Class Name")
|
||||||
|
.description("The fully-qualified class name of the JDBC driver. Example: com.mysql.jdbc.Driver")
|
||||||
|
.defaultValue(null)
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor DB_DRIVER_LOCATION = new PropertyDescriptor.Builder()
|
||||||
|
.name("hikaricp-driver-locations")
|
||||||
|
.displayName("Database Driver Location(s)")
|
||||||
|
.description("Comma-separated list of files/folders and/or URLs containing the driver JAR and its dependencies (if any). For example '/var/tmp/mariadb-java-client-1.1.7.jar'")
|
||||||
|
.defaultValue(null)
|
||||||
|
.required(false)
|
||||||
|
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY, ResourceType.URL)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.dynamicallyModifiesClasspath(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
|
||||||
|
.name("hikaricp-username")
|
||||||
|
.displayName("Database User")
|
||||||
|
.description("Database user name")
|
||||||
|
.defaultValue(null)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
|
||||||
|
.name("hikaricp-password")
|
||||||
|
.displayName("Password")
|
||||||
|
.description("The password for the database user")
|
||||||
|
.defaultValue(null)
|
||||||
|
.required(false)
|
||||||
|
.sensitive(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
|
||||||
|
.name("hikaricp-max-wait-time")
|
||||||
|
.displayName("Max Wait Time")
|
||||||
|
.description("The maximum amount of time that the pool will wait (when there are no available connections) "
|
||||||
|
+ " for a connection to be returned before failing, or 0 <time units> to wait indefinitely. ")
|
||||||
|
.defaultValue("500 millis")
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||||
|
.sensitive(false)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
|
||||||
|
.name("hikaricp-max-total-conns")
|
||||||
|
.displayName("Max Total Connections")
|
||||||
|
.description("This property controls the maximum size that the pool is allowed to reach, including both idle and in-use connections. Basically this value will determine the "
|
||||||
|
+ "maximum number of actual connections to the database backend. A reasonable value for this is best determined by your execution environment. When the pool reaches "
|
||||||
|
+ "this size, and no idle connections are available, the service will block for up to connectionTimeout milliseconds before timing out.")
|
||||||
|
.defaultValue(DEFAULT_TOTAL_CONNECTIONS)
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
||||||
|
.sensitive(false)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
|
||||||
|
.name("hikaricp-validation-query")
|
||||||
|
.displayName("Validation Query")
|
||||||
|
.description("Validation Query used to validate connections before returning them. "
|
||||||
|
+ "When connection is invalid, it gets dropped and new valid connection will be returned. "
|
||||||
|
+ "NOTE: Using validation might have some performance penalty.")
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder()
|
||||||
|
.name("hikaricp-min-idle-conns")
|
||||||
|
.displayName("Minimum Idle Connections")
|
||||||
|
.description("This property controls the minimum number of idle connections that HikariCP tries to maintain in the pool. If the idle connections dip below this value and total "
|
||||||
|
+ "connections in the pool are less than 'Max Total Connections', HikariCP will make a best effort to add additional connections quickly and efficiently. It is recommended "
|
||||||
|
+ "that this property to be set equal to 'Max Total Connections'.")
|
||||||
|
.defaultValue(DEFAULT_TOTAL_CONNECTIONS)
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
|
||||||
|
.name("hikaricp-max-conn-lifetime")
|
||||||
|
.displayName("Max Connection Lifetime")
|
||||||
|
.description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " +
|
||||||
|
"connection will fail the next activation, passivation or validation test. A value of zero or less " +
|
||||||
|
"means the connection has an infinite lifetime.")
|
||||||
|
.defaultValue(DEFAULT_MAX_CONN_LIFETIME)
|
||||||
|
.required(false)
|
||||||
|
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
|
||||||
|
.name("hikaricp-kerberos-user-service")
|
||||||
|
.displayName("Kerberos User Service")
|
||||||
|
.description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
|
||||||
|
.identifiesControllerService(KerberosUserService.class)
|
||||||
|
.required(false)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
private static final List<PropertyDescriptor> properties;
|
||||||
|
|
||||||
|
static {
|
||||||
|
final List<PropertyDescriptor> props = new ArrayList<>();
|
||||||
|
props.add(DATABASE_URL);
|
||||||
|
props.add(DB_DRIVERNAME);
|
||||||
|
props.add(DB_DRIVER_LOCATION);
|
||||||
|
props.add(KERBEROS_USER_SERVICE);
|
||||||
|
props.add(DB_USER);
|
||||||
|
props.add(DB_PASSWORD);
|
||||||
|
props.add(MAX_WAIT_TIME);
|
||||||
|
props.add(MAX_TOTAL_CONNECTIONS);
|
||||||
|
props.add(VALIDATION_QUERY);
|
||||||
|
props.add(MIN_IDLE);
|
||||||
|
props.add(MAX_CONN_LIFETIME);
|
||||||
|
|
||||||
|
properties = Collections.unmodifiableList(props);
|
||||||
|
}
|
||||||
|
|
||||||
|
private volatile HikariDataSource dataSource;
|
||||||
|
private volatile KerberosUser kerberosUser;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||||
|
final PropertyDescriptor.Builder builder = new PropertyDescriptor.Builder()
|
||||||
|
.name(propertyDescriptorName)
|
||||||
|
.required(false)
|
||||||
|
.dynamic(true)
|
||||||
|
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
|
||||||
|
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR);
|
||||||
|
|
||||||
|
if (propertyDescriptorName.startsWith(SENSITIVE_PROPERTY_PREFIX)) {
|
||||||
|
builder.sensitive(true).expressionLanguageSupported(ExpressionLanguageScope.NONE);
|
||||||
|
} else {
|
||||||
|
builder.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY);
|
||||||
|
}
|
||||||
|
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configures connection pool by creating an instance of the
|
||||||
|
* {@link HikariDataSource} based on configuration provided with
|
||||||
|
* {@link ConfigurationContext}.
|
||||||
|
* <p>
|
||||||
|
* This operation makes no guarantees that the actual connection could be
|
||||||
|
* made since the underlying system may still go off-line during normal
|
||||||
|
* operation of the connection pool.
|
||||||
|
*
|
||||||
|
* @param context the configuration context
|
||||||
|
*/
|
||||||
|
@OnEnabled
|
||||||
|
public void onConfigured(final ConfigurationContext context) {
|
||||||
|
|
||||||
|
final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
|
||||||
|
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
|
||||||
|
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
|
||||||
|
final String dburl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
|
||||||
|
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
|
||||||
|
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
|
||||||
|
final long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
|
||||||
|
final int minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
|
||||||
|
final long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
|
||||||
|
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
|
||||||
|
|
||||||
|
if (kerberosUserService != null) {
|
||||||
|
kerberosUser = kerberosUserService.createKerberosUser();
|
||||||
|
if (kerberosUser != null) {
|
||||||
|
kerberosUser.login();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
dataSource = new HikariDataSource();
|
||||||
|
dataSource.setDriverClassName(driverName);
|
||||||
|
dataSource.setConnectionTimeout(maxWaitMillis);
|
||||||
|
dataSource.setMaximumPoolSize(maxTotal);
|
||||||
|
dataSource.setMinimumIdle(minIdle);
|
||||||
|
dataSource.setMaxLifetime(maxConnLifetimeMillis);
|
||||||
|
|
||||||
|
if (validationQuery != null && !validationQuery.isEmpty()) {
|
||||||
|
dataSource.setConnectionTestQuery(validationQuery);
|
||||||
|
}
|
||||||
|
|
||||||
|
dataSource.setJdbcUrl(dburl);
|
||||||
|
dataSource.setUsername(user);
|
||||||
|
dataSource.setPassword(passw);
|
||||||
|
|
||||||
|
final List<PropertyDescriptor> dynamicProperties = context.getProperties()
|
||||||
|
.keySet()
|
||||||
|
.stream()
|
||||||
|
.filter(PropertyDescriptor::isDynamic)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
Properties properties = dataSource.getDataSourceProperties();
|
||||||
|
dynamicProperties.forEach((descriptor) -> {
|
||||||
|
final PropertyValue propertyValue = context.getProperty(descriptor);
|
||||||
|
if (descriptor.isSensitive()) {
|
||||||
|
final String propertyName = StringUtils.substringAfter(descriptor.getName(), SENSITIVE_PROPERTY_PREFIX);
|
||||||
|
properties.setProperty(propertyName, propertyValue.getValue());
|
||||||
|
} else {
|
||||||
|
properties.setProperty(descriptor.getName(), propertyValue.evaluateAttributeExpressions().getValue());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
dataSource.setDataSourceProperties(properties);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private long extractMillisWithInfinite(PropertyValue prop) {
|
||||||
|
return "-1".equals(prop.getValue()) ? INFINITE_MILLISECONDS : prop.asTimePeriod(TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown pool, close all open connections.
|
||||||
|
* If a principal is authenticated with a KDC, that principal is logged out.
|
||||||
|
* <p>
|
||||||
|
* If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser},
|
||||||
|
* an attempt will still be made to shut down the pool and close open connections.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@OnDisabled
|
||||||
|
public void shutdown() {
|
||||||
|
try {
|
||||||
|
if (kerberosUser != null) {
|
||||||
|
kerberosUser.logout();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
kerberosUser = null;
|
||||||
|
try {
|
||||||
|
if (dataSource != null) {
|
||||||
|
dataSource.close();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
dataSource = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Connection getConnection() throws ProcessException {
|
||||||
|
try {
|
||||||
|
final Connection con;
|
||||||
|
if (kerberosUser != null) {
|
||||||
|
KerberosAction<Connection> kerberosAction = new KerberosAction<>(kerberosUser, () -> dataSource.getConnection(), getLogger());
|
||||||
|
con = kerberosAction.execute();
|
||||||
|
} else {
|
||||||
|
con = dataSource.getConnection();
|
||||||
|
}
|
||||||
|
return con;
|
||||||
|
} catch (final SQLException e) {
|
||||||
|
// If using Kerberos, attempt to re-login
|
||||||
|
if (kerberosUser != null) {
|
||||||
|
getLogger().info("Error getting connection, performing Kerberos re-login");
|
||||||
|
kerberosUser.login();
|
||||||
|
}
|
||||||
|
throw new ProcessException("Connection retrieval failed", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s[id=%s]", getClass().getSimpleName(), getIdentifier());
|
||||||
|
}
|
||||||
|
|
||||||
|
HikariDataSource getDataSource() {
|
||||||
|
return dataSource;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
org.apache.nifi.dbcp.HikariCPConnectionPool
|
|
@ -0,0 +1,257 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.dbcp;
|
||||||
|
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
import org.apache.nifi.util.NoOpProcessor;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Statement;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class HikariCPConnectionPoolTest {
|
||||||
|
private final static String DB_LOCATION = "target/db";
|
||||||
|
private final static String GOOD_CS_NAME = "test-good1";
|
||||||
|
private final static String BAD_CS_NAME = "test-bad1";
|
||||||
|
private final static String EXHAUST_CS_NAME = "test-exhaust";
|
||||||
|
|
||||||
|
private static String originalDerbyStreamErrorFile;
|
||||||
|
|
||||||
|
private TestRunner runner;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void setupBeforeClass() {
|
||||||
|
originalDerbyStreamErrorFile = System.getProperty("derby.stream.error.file");
|
||||||
|
System.setProperty("derby.stream.error.file", "target/derby.log");
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void shutdownAfterClass() {
|
||||||
|
if (originalDerbyStreamErrorFile != null) {
|
||||||
|
System.setProperty("derby.stream.error.file", originalDerbyStreamErrorFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setup() {
|
||||||
|
// remove previous test database, if any
|
||||||
|
final File dbLocation = new File(DB_LOCATION);
|
||||||
|
dbLocation.delete();
|
||||||
|
|
||||||
|
runner = TestRunners.newTestRunner(NoOpProcessor.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Missing property values.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMissingPropertyValues() throws InitializationException {
|
||||||
|
final HikariCPConnectionPool service = new HikariCPConnectionPool();
|
||||||
|
final Map<String, String> properties = new HashMap<>();
|
||||||
|
runner.addControllerService(BAD_CS_NAME, service, properties);
|
||||||
|
runner.assertNotValid(service);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Max wait set to -1
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMaxWait() throws InitializationException {
|
||||||
|
final HikariCPConnectionPool service = new HikariCPConnectionPool();
|
||||||
|
runner.addControllerService(GOOD_CS_NAME, service);
|
||||||
|
|
||||||
|
setDerbyProperties(service);
|
||||||
|
runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, "0 millis");
|
||||||
|
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
runner.assertValid(service);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks validity of idle limit and time settings including a default
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testIdleConnectionsSettings() throws InitializationException {
|
||||||
|
final HikariCPConnectionPool service = new HikariCPConnectionPool();
|
||||||
|
runner.addControllerService(GOOD_CS_NAME, service);
|
||||||
|
|
||||||
|
setDerbyProperties(service);
|
||||||
|
runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, "0 millis");
|
||||||
|
runner.setProperty(service, HikariCPConnectionPool.MAX_CONN_LIFETIME, "1 secs");
|
||||||
|
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
runner.assertValid(service);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMinIdleCannotBeNegative() throws InitializationException {
|
||||||
|
final HikariCPConnectionPool service = new HikariCPConnectionPool();
|
||||||
|
runner.addControllerService(GOOD_CS_NAME, service);
|
||||||
|
|
||||||
|
setDerbyProperties(service);
|
||||||
|
runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, "0 millis");
|
||||||
|
runner.setProperty(service, HikariCPConnectionPool.MIN_IDLE, "-1");
|
||||||
|
|
||||||
|
runner.assertNotValid(service);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks to ensure that settings have been passed down into the HikariCP
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testIdleSettingsAreSet() throws InitializationException {
|
||||||
|
final HikariCPConnectionPool service = new HikariCPConnectionPool();
|
||||||
|
runner.addControllerService(GOOD_CS_NAME, service);
|
||||||
|
|
||||||
|
setDerbyProperties(service);
|
||||||
|
runner.setProperty(service, HikariCPConnectionPool.MAX_WAIT_TIME, "0 millis");
|
||||||
|
runner.setProperty(service, HikariCPConnectionPool.MIN_IDLE, "4");
|
||||||
|
runner.setProperty(service, HikariCPConnectionPool.MAX_CONN_LIFETIME, "1 secs");
|
||||||
|
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
|
||||||
|
Assertions.assertEquals(4, service.getDataSource().getMinimumIdle());
|
||||||
|
Assertions.assertEquals(1000, service.getDataSource().getMaxLifetime());
|
||||||
|
|
||||||
|
service.getDataSource().close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test database connection using Derby. Connect, create table, insert, select, drop table.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCreateInsertSelect() throws InitializationException, SQLException {
|
||||||
|
final HikariCPConnectionPool service = new HikariCPConnectionPool();
|
||||||
|
runner.addControllerService(GOOD_CS_NAME, service);
|
||||||
|
|
||||||
|
setDerbyProperties(service);
|
||||||
|
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
|
||||||
|
runner.assertValid(service);
|
||||||
|
final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService(GOOD_CS_NAME);
|
||||||
|
Assertions.assertNotNull(dbcpService);
|
||||||
|
final Connection connection = dbcpService.getConnection();
|
||||||
|
Assertions.assertNotNull(connection);
|
||||||
|
|
||||||
|
createInsertSelectDrop(connection);
|
||||||
|
|
||||||
|
connection.close(); // return to pool
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test get database connection using Derby. Get many times, after a while pool should not contain any available connection and getConnection should fail.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testExhaustPool() throws InitializationException {
|
||||||
|
final HikariCPConnectionPool service = new HikariCPConnectionPool();
|
||||||
|
runner.addControllerService(EXHAUST_CS_NAME, service);
|
||||||
|
|
||||||
|
setDerbyProperties(service);
|
||||||
|
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
|
||||||
|
runner.assertValid(service);
|
||||||
|
Assertions.assertDoesNotThrow(() -> {
|
||||||
|
runner.getProcessContext().getControllerServiceLookup().getControllerService(EXHAUST_CS_NAME);
|
||||||
|
});
|
||||||
|
final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService(EXHAUST_CS_NAME);
|
||||||
|
Assertions.assertNotNull(dbcpService);
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
final Connection connection = dbcpService.getConnection();
|
||||||
|
Assertions.assertNotNull(connection);
|
||||||
|
}
|
||||||
|
Assertions.fail("Should have exhausted the pool and thrown a ProcessException");
|
||||||
|
} catch (ProcessException pe) {
|
||||||
|
// Do nothing, this is expected
|
||||||
|
} catch (Throwable t) {
|
||||||
|
Assertions.fail("Should have exhausted the pool and thrown a ProcessException but threw " + t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test get database connection using Derby. Get many times, release immediately and getConnection should not fail.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testGetManyNormal() throws InitializationException, SQLException {
|
||||||
|
final HikariCPConnectionPool service = new HikariCPConnectionPool();
|
||||||
|
runner.addControllerService(EXHAUST_CS_NAME, service);
|
||||||
|
|
||||||
|
setDerbyProperties(service);
|
||||||
|
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
|
||||||
|
runner.assertValid(service);
|
||||||
|
final DBCPService dbcpService = (DBCPService) runner.getProcessContext().getControllerServiceLookup().getControllerService(EXHAUST_CS_NAME);
|
||||||
|
Assertions.assertNotNull(dbcpService);
|
||||||
|
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
final Connection connection = dbcpService.getConnection();
|
||||||
|
Assertions.assertNotNull(connection);
|
||||||
|
connection.close(); // will return connection to pool
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createInsertSelectDrop(Connection con) throws SQLException {
|
||||||
|
|
||||||
|
final Statement st = con.createStatement();
|
||||||
|
|
||||||
|
try {
|
||||||
|
String dropTable = "drop table restaurants";
|
||||||
|
st.executeUpdate(dropTable);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
// table may not exist, this is not serious problem.
|
||||||
|
}
|
||||||
|
|
||||||
|
String createTable = "create table restaurants(id integer, name varchar(20), city varchar(50))";
|
||||||
|
st.executeUpdate(createTable);
|
||||||
|
|
||||||
|
st.executeUpdate("insert into restaurants values (1, 'Irifunes', 'San Mateo')");
|
||||||
|
st.executeUpdate("insert into restaurants values (2, 'Estradas', 'Daly City')");
|
||||||
|
st.executeUpdate("insert into restaurants values (3, 'Prime Rib House', 'San Francisco')");
|
||||||
|
|
||||||
|
int nrOfRows = 0;
|
||||||
|
final ResultSet resultSet = st.executeQuery("select * from restaurants");
|
||||||
|
while (resultSet.next()) {
|
||||||
|
nrOfRows++;
|
||||||
|
}
|
||||||
|
Assertions.assertEquals(3, nrOfRows);
|
||||||
|
|
||||||
|
st.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setDerbyProperties(final HikariCPConnectionPool service) {
|
||||||
|
runner.setProperty(service, HikariCPConnectionPool.DATABASE_URL, "jdbc:derby:" + DB_LOCATION + ";create=true");
|
||||||
|
runner.setProperty(service, HikariCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
|
||||||
|
runner.setProperty(service, HikariCPConnectionPool.DB_USER, "tester");
|
||||||
|
runner.setProperty(service, HikariCPConnectionPool.DB_PASSWORD, "testerp");
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,6 +24,7 @@
|
||||||
<packaging>pom</packaging>
|
<packaging>pom</packaging>
|
||||||
<modules>
|
<modules>
|
||||||
<module>nifi-dbcp-service</module>
|
<module>nifi-dbcp-service</module>
|
||||||
|
<module>nifi-hikari-dbcp-service</module>
|
||||||
<module>nifi-dbcp-service-nar</module>
|
<module>nifi-dbcp-service-nar</module>
|
||||||
</modules>
|
</modules>
|
||||||
</project>
|
</project>
|
||||||
|
|
Loading…
Reference in New Issue