NIFI-5229 Adding a DBCPService implementation that can lookup other DBCPServices dynamically at runtime

This closes #2735

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Bryan Bende 2018-05-23 11:21:12 -04:00 committed by Mike Thomsen
parent f15e6c7ab3
commit cf6089196f
3 changed files with 324 additions and 1 deletions

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
@CapabilityDescription("Provides a DBCPService that can be used to dynamically select another DBCPService. This service " +
"requires an attribute named 'database.name' to be passed in when asking for a connection, and will throw an exception " +
"if the attribute is missing. The value of 'database.name' will be used to select the DBCPService that has been " +
"registered with that name. This will allow multiple DBCPServices to be defined and registered, and then selected " +
"dynamically at runtime by tagging flow files with the appropriate 'database.name' attribute.")
@DynamicProperty(name = "The ", value = "JDBC property value", expressionLanguageScope = ExpressionLanguageScope.NONE,
description = "")
public class DBCPConnectionPoolLookup extends AbstractControllerService implements DBCPService {
public static final String DATABASE_NAME_ATTRIBUTE = "database.name";
private volatile Map<String,DBCPService> dbcpServiceMap;
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description("The DBCPService to return when database.name = '" + propertyDescriptorName + "'")
.identifiesControllerService(DBCPService.class)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
final List<ValidationResult> results = new ArrayList<>();
int numDefinedServices = 0;
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (descriptor.isDynamic()) {
numDefinedServices++;
}
final String referencedId = context.getProperty(descriptor).getValue();
if (this.getIdentifier().equals(referencedId)) {
results.add(new ValidationResult.Builder()
.subject(descriptor.getDisplayName())
.explanation("the current service cannot be registered as a DBCPService to lookup")
.valid(false)
.build());
}
}
if (numDefinedServices == 0) {
results.add(new ValidationResult.Builder()
.subject(this.getClass().getSimpleName())
.explanation("at least one DBCPService must be defined via dynamic properties")
.valid(false)
.build());
}
return results;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
final Map<String,DBCPService> serviceMap = new HashMap<>();
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (descriptor.isDynamic()) {
final DBCPService dbcpService = context.getProperty(descriptor).asControllerService(DBCPService.class);
serviceMap.put(descriptor.getName(), dbcpService);
}
}
dbcpServiceMap = Collections.unmodifiableMap(serviceMap);
}
@OnDisabled
public void onDisabled() {
dbcpServiceMap = null;
}
@Override
public Connection getConnection() throws ProcessException {
throw new UnsupportedOperationException("Cannot lookup DBCPConnectionPool without attributes");
}
@Override
public Connection getConnection(Map<String, String> attributes) throws ProcessException {
if (!attributes.containsKey(DATABASE_NAME_ATTRIBUTE)) {
throw new ProcessException("Attributes must contain an attribute name '" + DATABASE_NAME_ATTRIBUTE + "'");
}
final String databaseName = attributes.get(DATABASE_NAME_ATTRIBUTE);
if (StringUtils.isBlank(databaseName)) {
throw new ProcessException(DATABASE_NAME_ATTRIBUTE + " cannot be null or blank");
}
final DBCPService dbcpService = dbcpServiceMap.get(databaseName);
if (dbcpService == null) {
throw new ProcessException("No DBCPService was found for database.name '" + databaseName + "'");
}
return dbcpService.getConnection(attributes);
}
}

View File

@ -12,4 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.dbcp.DBCPConnectionPool
org.apache.nifi.dbcp.DBCPConnectionPool
org.apache.nifi.dbcp.DBCPConnectionPoolLookup

View File

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import java.sql.Connection;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestDBCPConnectionPoolLookup {
private MockConnection connectionA;
private MockConnection connectionB;
private MockDBCPService dbcpServiceA;
private MockDBCPService dbcpServiceB;
private DBCPService dbcpLookupService;
private TestRunner runner;
@Before
public void setup() throws InitializationException {
connectionA = mock(MockConnection.class);
when(connectionA.getName()).thenReturn("A");
connectionB = mock(MockConnection.class);
when(connectionB.getName()).thenReturn("B");
dbcpServiceA = new MockDBCPService(connectionA);
dbcpServiceB = new MockDBCPService(connectionB);
dbcpLookupService = new DBCPConnectionPoolLookup();
runner = TestRunners.newTestRunner(TestProcessor.class);
final String dbcpServiceAIdentifier = "dbcp-a";
runner.addControllerService(dbcpServiceAIdentifier, dbcpServiceA);
final String dbcpServiceBIdentifier = "dbcp-b";
runner.addControllerService(dbcpServiceBIdentifier, dbcpServiceB);
runner.addControllerService("dbcp-lookup", dbcpLookupService);
runner.setProperty(dbcpLookupService, "a", dbcpServiceAIdentifier);
runner.setProperty(dbcpLookupService, "b", dbcpServiceBIdentifier);
runner.enableControllerService(dbcpServiceA);
runner.enableControllerService(dbcpServiceB);
runner.enableControllerService(dbcpLookupService);
}
@Test
public void testLookupServiceA() {
final Map<String,String> attributes = new HashMap<>();
attributes.put(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "a");
final Connection connection = dbcpLookupService.getConnection(attributes);
assertNotNull(connection);
assertTrue(connection instanceof MockConnection);
final MockConnection mockConnection = (MockConnection)connection;
assertEquals(connectionA.getName(), mockConnection.getName());
}
@Test
public void testLookupServiceB() {
final Map<String,String> attributes = new HashMap<>();
attributes.put(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "b");
final Connection connection = dbcpLookupService.getConnection(attributes);
assertNotNull(connection);
assertTrue(connection instanceof MockConnection);
final MockConnection mockConnection = (MockConnection)connection;
assertEquals(connectionB.getName(), mockConnection.getName());
}
@Test(expected = UnsupportedOperationException.class)
public void testLookupWithoutAttributes() {
dbcpLookupService.getConnection();
}
@Test(expected = ProcessException.class)
public void testLookupMissingDatabaseNameAttribute() {
final Map<String,String> attributes = new HashMap<>();
dbcpLookupService.getConnection(attributes);
}
@Test(expected = ProcessException.class)
public void testLookupWithDatabaseNameThatDoesNotExist() {
final Map<String,String> attributes = new HashMap<>();
attributes.put(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "DOES-NOT-EXIST");
dbcpLookupService.getConnection(attributes);
}
@Test
public void testCustomValidateAtLeaseOneServiceDefined() throws InitializationException {
// enable lookup service with no services registered, verify not valid
runner = TestRunners.newTestRunner(TestProcessor.class);
runner.addControllerService("dbcp-lookup", dbcpLookupService);
runner.enableControllerService(dbcpLookupService);
runner.assertNotValid(dbcpLookupService);
final String dbcpServiceAIdentifier = "dbcp-a";
runner.addControllerService(dbcpServiceAIdentifier, dbcpServiceA);
runner.enableControllerService(dbcpServiceA);
// register a service and now verify valid
runner.disableControllerService(dbcpLookupService);
runner.setProperty(dbcpLookupService, "a", dbcpServiceAIdentifier);
runner.enableControllerService(dbcpLookupService);
runner.assertValid(dbcpLookupService);
}
@Test
public void testCustomValidateSelfReferenceNotAllowed() throws InitializationException {
runner = TestRunners.newTestRunner(TestProcessor.class);
runner.addControllerService("dbcp-lookup", dbcpLookupService);
runner.setProperty(dbcpLookupService, "dbcp-lookup", "dbcp-lookup");
runner.enableControllerService(dbcpLookupService);
runner.assertNotValid(dbcpLookupService);
}
/**
* A mock DBCPService that will always return the passed in MockConnection.
*/
private static class MockDBCPService extends AbstractControllerService implements DBCPService {
private MockConnection connection;
public MockDBCPService(MockConnection connection) {
this.connection = connection;
}
@Override
public Connection getConnection() throws ProcessException {
return connection;
}
@Override
public Connection getConnection(Map<String, String> attributes) throws ProcessException {
return connection;
}
}
/**
* A mock Connection that will allow us to mock getName so we can identify we have the expected Connection.
*/
private interface MockConnection extends Connection {
String getName();
}
}