mirror of https://github.com/apache/nifi.git
NIFI-3426: Add dynamic property support to DBCPConnectionPool
Signed-off-by: James Wing <jvwing@gmail.com> This closes #1461.
This commit is contained in:
parent
6d4901cd26
commit
2d6d7710c7
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.nifi.dbcp;
|
package org.apache.nifi.dbcp;
|
||||||
|
|
||||||
import org.apache.commons.dbcp.BasicDataSource;
|
import org.apache.commons.dbcp.BasicDataSource;
|
||||||
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||||
|
@ -24,6 +25,7 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
|
import org.apache.nifi.expression.AttributeExpression;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
@ -45,6 +47,10 @@ import java.util.concurrent.TimeUnit;
|
||||||
*/
|
*/
|
||||||
@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
|
@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
|
||||||
@CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage.")
|
@CapabilityDescription("Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage.")
|
||||||
|
@DynamicProperty(name = "JDBC property name", value = "JDBC property value", supportsExpressionLanguage = true,
|
||||||
|
description = "Specifies a property name and value to be set on the JDBC connection(s). "
|
||||||
|
+ "If Expression Language is used, evaluation will be performed upon the controller service being enabled. "
|
||||||
|
+ "Note that no flow file input (attributes, e.g.) is available for use in Expression Language constructs for these properties.")
|
||||||
public class DBCPConnectionPool extends AbstractControllerService implements DBCPService {
|
public class DBCPConnectionPool extends AbstractControllerService implements DBCPService {
|
||||||
|
|
||||||
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
|
||||||
|
@ -148,6 +154,18 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||||
|
return new PropertyDescriptor.Builder()
|
||||||
|
.name(propertyDescriptorName)
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
|
||||||
|
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.dynamic(true)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configures connection pool by creating an instance of the
|
* Configures connection pool by creating an instance of the
|
||||||
* {@link BasicDataSource} based on configuration provided with
|
* {@link BasicDataSource} based on configuration provided with
|
||||||
|
@ -192,6 +210,11 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
|
||||||
dataSource.setUrl(dburl);
|
dataSource.setUrl(dburl);
|
||||||
dataSource.setUsername(user);
|
dataSource.setUsername(user);
|
||||||
dataSource.setPassword(passw);
|
dataSource.setPassword(passw);
|
||||||
|
|
||||||
|
context.getProperties().keySet().stream().filter(PropertyDescriptor::isDynamic)
|
||||||
|
.forEach((dynamicPropDescriptor) -> dataSource.addConnectionProperty(dynamicPropDescriptor.getName(),
|
||||||
|
context.getProperty(dynamicPropDescriptor).evaluateAttributeExpressions().getValue()));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,75 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.dbcp
|
||||||
|
|
||||||
|
import org.apache.nifi.reporting.InitializationException
|
||||||
|
import org.apache.nifi.util.TestRunner
|
||||||
|
import org.apache.nifi.util.TestRunners
|
||||||
|
import org.junit.Assert
|
||||||
|
import org.junit.BeforeClass
|
||||||
|
import org.junit.Test
|
||||||
|
|
||||||
|
import java.sql.Connection
|
||||||
|
import java.sql.SQLException
|
||||||
|
|
||||||
|
import static org.apache.nifi.dbcp.DBCPConnectionPool.*
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Groovy unit tests for the DBCPService module.
|
||||||
|
*/
|
||||||
|
class GroovyDBCPServiceTest {
|
||||||
|
|
||||||
|
final static String DB_LOCATION = "target/db"
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
static void setup() {
|
||||||
|
System.setProperty("derby.stream.error.file", "target/derby.log")
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test dynamic connection properties.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
void testDynamicProperties() throws InitializationException, SQLException {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(TestProcessor)
|
||||||
|
final DBCPConnectionPool service = new DBCPConnectionPool()
|
||||||
|
runner.addControllerService("test-dynamic-properties", service)
|
||||||
|
|
||||||
|
// remove previous test database, if any
|
||||||
|
final File dbLocation = new File(DB_LOCATION)
|
||||||
|
dbLocation.deleteDir()
|
||||||
|
|
||||||
|
// set embedded Derby database connection url
|
||||||
|
runner.setProperty(service, DATABASE_URL, "jdbc:derby:" + DB_LOCATION)
|
||||||
|
runner.setProperty(service, DB_USER, "tester")
|
||||||
|
runner.setProperty(service, DB_PASSWORD, "testerp")
|
||||||
|
runner.setProperty(service, DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver")
|
||||||
|
runner.setProperty(service, "create", "true")
|
||||||
|
|
||||||
|
runner.enableControllerService(service)
|
||||||
|
|
||||||
|
runner.assertValid(service)
|
||||||
|
final DBCPService dbcpService = (DBCPService) runner.processContext.controllerServiceLookup.getControllerService("test-dynamic-properties")
|
||||||
|
Assert.assertNotNull(dbcpService)
|
||||||
|
|
||||||
|
2.times {
|
||||||
|
final Connection connection = dbcpService.getConnection()
|
||||||
|
Assert.assertNotNull(connection)
|
||||||
|
connection.close() // will return connection to pool
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue