From 6412097eb2e42ed09f0683c74f72e421cab1652c Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Thu, 12 Mar 2020 15:57:11 -0400 Subject: [PATCH] NIFI-7257 Added HadoopDBCPConnectionPool - Updated InstanceClassLoader to resolve files that are in the instance urls or additional urls - Updated nifi-mock to support KerberosContext and removeProperty for ControllerServices - Added unit test for HadoopDBCPConnectionPool - Addressing review feedback This closes #4149. --- nifi-assembly/pom.xml | 6 + ...ontrollerServiceInitializationContext.java | 17 +- .../apache/nifi/util/MockKerberosContext.java | 53 ++ .../util/StandardProcessorTestRunner.java | 89 ++- .../java/org/apache/nifi/util/TestRunner.java | 34 +- .../apache/nifi/nar/InstanceClassLoader.java | 43 ++ .../nifi/nar/TestInstanceClassLoader.java | 54 ++ .../nifi-hadoop-dbcp-service-nar/pom.xml | 42 ++ .../src/main/resources/META-INF/LICENSE | 202 ++++++ .../src/main/resources/META-INF/NOTICE | 41 ++ .../nifi-hadoop-dbcp-service/pom.xml | 110 ++++ .../nifi/dbcp/HadoopDBCPConnectionPool.java | 607 ++++++++++++++++++ .../apache/nifi/dbcp/ValidationResources.java | 39 ++ ...g.apache.nifi.controller.ControllerService | 15 + .../dbcp/HadoopDBCPConnectionPoolTest.java | 154 +++++ .../org/apache/nifi/dbcp/TestProcessor.java | 45 ++ .../src/test/resources/core-site-security.xml | 30 + .../src/test/resources/core-site.xml | 22 + .../src/test/resources/fake.keytab | 0 .../src/test/resources/krb5.conf | 12 + .../nifi-hadoop-dbcp-service-bundle/pom.xml | 29 + .../nifi-standard-services/pom.xml | 1 + 22 files changed, 1605 insertions(+), 40 deletions(-) create mode 100644 nifi-mock/src/main/java/org/apache/nifi/util/MockKerberosContext.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/TestInstanceClassLoader.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service-nar/src/main/resources/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/ValidationResources.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestProcessor.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/resources/core-site-security.xml create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/resources/core-site.xml create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/resources/fake.keytab create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/resources/krb5.conf create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/pom.xml diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 7ee07c317c..7f96c8758d 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -277,6 +277,12 @@ language governing permissions and limitations under the License. --> 1.12.0-SNAPSHOT nar + + org.apache.nifi + nifi-hadoop-dbcp-service-nar + 1.12.0-SNAPSHOT + nar + org.apache.nifi nifi-mongodb-client-service-api-nar diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java index e02eabaf26..021bdc2459 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockControllerServiceInitializationContext.java @@ -16,15 +16,17 @@ */ package org.apache.nifi.util; -import java.io.File; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.NodeTypeProvider; +import org.apache.nifi.kerberos.KerberosContext; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.state.MockStateManager; +import java.io.File; + public class MockControllerServiceInitializationContext extends MockControllerServiceLookup implements ControllerServiceInitializationContext, ControllerServiceLookup, NodeTypeProvider { private final String identifier; @@ -32,6 +34,7 @@ public class MockControllerServiceInitializationContext extends MockControllerSe private final StateManager stateManager; private volatile boolean isClustered; private volatile boolean isPrimaryNode; + private final KerberosContext kerberosContext; public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier) { this(controllerService, identifier, new MockStateManager(controllerService)); @@ -42,9 +45,15 @@ public class MockControllerServiceInitializationContext extends MockControllerSe } public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier, final ComponentLog logger, final StateManager stateManager) { + this(controllerService, identifier, logger, stateManager, null); + } + + public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier, final ComponentLog logger, final StateManager stateManager, + final KerberosContext kerberosContext) { this.identifier = identifier; this.logger = logger; this.stateManager = stateManager; + this.kerberosContext = kerberosContext; addControllerService(controllerService, identifier); } @@ -80,17 +89,17 @@ public class MockControllerServiceInitializationContext extends MockControllerSe @Override public String getKerberosServicePrincipal() { - return null; //this needs to be wired in. + return kerberosContext != null ? kerberosContext.getKerberosServicePrincipal() : null; } @Override public File getKerberosServiceKeytab() { - return null; //this needs to be wired in. + return kerberosContext != null ? kerberosContext.getKerberosServiceKeytab() : null; } @Override public File getKerberosConfigurationFile() { - return null; //this needs to be wired in. + return kerberosContext != null ? kerberosContext.getKerberosConfigurationFile() : null; } @Override diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockKerberosContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockKerberosContext.java new file mode 100644 index 0000000000..480eab8699 --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockKerberosContext.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.kerberos.KerberosContext; + +import java.io.File; + +public class MockKerberosContext implements KerberosContext { + + private final String kerberosServicePrincipal; + private final File kerberosServiceKeytab; + private final File kerberosConfigFile; + + public MockKerberosContext(final File kerberosConfigFile) { + this(null, null, kerberosConfigFile); + } + + public MockKerberosContext(final String kerberosServicePrincipal, final File kerberosServiceKeytab, final File kerberosConfigFile) { + this.kerberosServicePrincipal = kerberosServicePrincipal; + this.kerberosServiceKeytab = kerberosServiceKeytab; + this.kerberosConfigFile = kerberosConfigFile; + } + + @Override + public String getKerberosServicePrincipal() { + return kerberosServicePrincipal; + } + + @Override + public File getKerberosServiceKeytab() { + return kerberosServiceKeytab; + } + + @Override + public File getKerberosConfigurationFile() { + return kerberosConfigFile; + } +} diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 40010c6b07..645a585fc2 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -16,34 +16,6 @@ */ package org.apache.nifi.util; -import static java.util.Objects.requireNonNull; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.lang.reflect.InvocationTargetException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Predicate; - import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; @@ -74,6 +46,34 @@ import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.state.MockStateManager; import org.junit.Assert; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; + +import static java.util.Objects.requireNonNull; + public class StandardProcessorTestRunner implements TestRunner { private final Processor processor; @@ -618,7 +618,8 @@ public class StandardProcessorTestRunner implements TestRunner { final MockComponentLog logger = new MockComponentLog(identifier, service); controllerServiceLoggers.put(identifier, logger); final MockStateManager serviceStateManager = new MockStateManager(service); - final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier), logger, serviceStateManager); + final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext( + requireNonNull(service), requireNonNull(identifier), logger, serviceStateManager, kerberosContext); controllerServiceStateManagers.put(identifier, serviceStateManager); initContext.addControllerServices(context); service.initialize(initContext); @@ -839,6 +840,36 @@ public class StandardProcessorTestRunner implements TestRunner { return context.removeProperty(property); } + @Override + public boolean removeProperty(final ControllerService service, final PropertyDescriptor property) { + final MockStateManager serviceStateManager = controllerServiceStateManagers.get(service.getIdentifier()); + if (serviceStateManager == null) { + throw new IllegalStateException("Controller service " + service + " has not been added to this TestRunner via the #addControllerService method"); + } + + final ControllerServiceConfiguration configuration = getConfigToUpdate(service); + final Map curProps = configuration.getProperties(); + final Map updatedProps = new HashMap<>(curProps); + + final String oldValue = updatedProps.remove(property); + if (oldValue == null) { + return false; + } + + configuration.setProperties(updatedProps); + service.onPropertyModified(property, oldValue, null); + return true; + } + + @Override + public boolean removeProperty(ControllerService service, String propertyName) { + final PropertyDescriptor descriptor = service.getPropertyDescriptor(propertyName); + if (descriptor == null) { + return false; + } + return removeProperty(service, descriptor); + } + @Override public void clearProperties() { context.clearProperties(); diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index a15c806501..ce5a837fd8 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -16,13 +16,6 @@ */ package org.apache.nifi.util; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Path; -import java.util.List; -import java.util.Map; -import java.util.function.Predicate; - import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationResult; @@ -38,6 +31,13 @@ import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.state.MockStateManager; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + public interface TestRunner { /** @@ -855,6 +855,16 @@ public interface TestRunner { */ boolean removeProperty(PropertyDescriptor descriptor); + /** + * Removes the {@link PropertyDescriptor} from the ControllerService, + * effectively setting its value to null, or the property's default value, if it has one. + * + * @param controllerService the controller service to remove the property from + * @param descriptor of property to remove + * @return true if removed, false if the property was not set + */ + boolean removeProperty(ControllerService controllerService, PropertyDescriptor descriptor); + /** * Removes the property from the {@link ProcessContext}, * effectively setting its value to null, or the property's default value, if it has one. @@ -864,6 +874,16 @@ public interface TestRunner { */ boolean removeProperty(String property); + /** + * Removes the {@link PropertyDescriptor} from the ControllerService, + * effectively setting its value to null, or the property's default value, if it has one. + * + * @param controllerService the controller service to remove the property from + * @param property name of the property to remove + * @return true if removed, false if the property was not set + */ + boolean removeProperty(ControllerService controllerService, String property); + /** * Clears all set properties from the {@link ProcessContext}. */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java index bf78768bd0..e1c1e0115b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/nar/InstanceClassLoader.java @@ -20,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; @@ -72,6 +73,48 @@ public class InstanceClassLoader extends AbstractNativeLibHandlingClassLoader { additionalResourceUrls == null ? Collections.emptySet() : new LinkedHashSet<>(additionalResourceUrls)); } + /** + * Note: Normally URLClassLoader will only load resources that are inside JARs, or in directories, but many times we allow + * properties to specify specific files to add to the classpath. This allows those files to be found by checking the known + * URLs of the InstanceClassLoader, when the resource wasn't find in the parent hierarchy. + */ + @Override + public URL findResource(String name) { + URL resourceUrl = super.findResource(name); + + if (resourceUrl == null) { + resourceUrl = findResource(instanceUrls, name); + } + + if (resourceUrl == null) { + resourceUrl = findResource(additionalResourceUrls, name); + } + + return resourceUrl; + } + + private URL findResource(final Set urls, final String name) { + if (urls == null || name == null) { + return null; + } + + for (final URL url : urls) { + try { + final URI uri = url.toURI(); + final File file = new File(uri); + if (name.equals(file.getName())) { + logger.debug("Found resource '" + name + "' from URL '" + url.toExternalForm() + "'"); + return url; + } + } catch (URISyntaxException e) { + logger.error(e.getMessage(), e); + return null; + } + } + + return null; + } + private static List initNativeLibDirList(Set narNativeLibDirs, Set additionalResourceUrls) { List nativeLibDirList = new ArrayList<>(narNativeLibDirs); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/TestInstanceClassLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/TestInstanceClassLoader.java new file mode 100644 index 0000000000..12256faf8c --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/test/java/org/apache/nifi/nar/TestInstanceClassLoader.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar; + +import org.junit.Test; + +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Collections; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertEquals; + +public class TestInstanceClassLoader { + + @Test + public void testFindResourceWhenFile() throws MalformedURLException { + final File nifiProperties = new File("src/test/resources/nifi.properties"); + assertTrue(nifiProperties.exists()); + + final URL nifiPropertiesURL = nifiProperties.toURI().toURL(); + + final ClassLoader instanceClassLoader = new InstanceClassLoader( + "id", + "org.apache.nifi.processors.MyProcessor", + Collections.emptySet(), + Collections.singleton(nifiPropertiesURL), + null); + + final URL nifiPropertiesResource = instanceClassLoader.getResource(nifiProperties.getName()); + assertNotNull(nifiPropertiesResource); + assertEquals(nifiPropertiesURL.toExternalForm(), nifiPropertiesResource.toExternalForm()); + + final URL doesNotExistResource = instanceClassLoader.getResource("does-not-exist.txt"); + assertNull(doesNotExistResource); + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service-nar/pom.xml new file mode 100644 index 0000000000..4bc28fe528 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service-nar/pom.xml @@ -0,0 +1,42 @@ + + + 4.0.0 + + org.apache.nifi + nifi-hadoop-dbcp-service-bundle + 1.12.0-SNAPSHOT + + + nifi-hadoop-dbcp-service-nar + nar + + true + true + + + + org.apache.nifi + nifi-standard-services-api-nar + 1.12.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-hadoop-dbcp-service + 1.12.0-SNAPSHOT + + + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..7a4a3ea242 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..b92d22e523 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,41 @@ +nifi-hadoop-dbcp-service-nar +Copyright 2014-2020 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache Commons DBCP + The following NOTICE information applies: + Apache Commons DBCP + Copyright 2001-2018 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + (ASLv2) Apache Commons Pool + The following NOTICE information applies: + Apache Commons Pool + Copyright 2001-2018 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + (ASLv2) Apache Derby + The following NOTICE information applies: + Apache Derby + Copyright 2004-2014 Apache, Apache DB, Apache Derby, Apache Torque, Apache JDO, Apache DDLUtils, + the Derby hat logo, the Apache JDO logo, and the Apache feather logo are trademarks of The Apache Software Foundation. diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml new file mode 100644 index 0000000000..a7f290bd72 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/pom.xml @@ -0,0 +1,110 @@ + + + 4.0.0 + + org.apache.nifi + nifi-hadoop-dbcp-service-bundle + 1.12.0-SNAPSHOT + + nifi-hadoop-dbcp-service + jar + + + org.apache.nifi + nifi-dbcp-service-api + 1.12.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + 1.12.0-SNAPSHOT + + + org.apache.nifi + nifi-service-utils + 1.12.0-SNAPSHOT + + + org.apache.nifi + nifi-security-utils + 1.12.0-SNAPSHOT + + + org.apache.nifi + nifi-hadoop-utils + 1.12.0-SNAPSHOT + + + org.apache.nifi + nifi-kerberos-credentials-service-api + 1.12.0-SNAPSHOT + provided + + + org.apache.commons + commons-dbcp2 + 2.7.0 + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + provided + + + org.slf4j + slf4j-log4j12 + + + + + + org.apache.nifi + nifi-mock + 1.12.0-SNAPSHOT + test + + + org.apache.nifi + nifi-mock-record-utils + 1.12.0-SNAPSHOT + test + + + + + + + org.apache.rat + apache-rat-plugin + + + src/test/resources/fake.keytab + src/test/resources/krb5.conf + + + + + + + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java new file mode 100644 index 0000000000..2b7925c8bb --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPool.java @@ -0,0 +1,607 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.dbcp; + +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.kerberos.KerberosCredentialsService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.security.krb.KerberosKeytabUser; +import org.apache.nifi.security.krb.KerberosPasswordUser; +import org.apache.nifi.security.krb.KerberosUser; + +import javax.security.auth.login.LoginException; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Implementation of Database Connection Pooling Service for Hadoop related JDBC Service. + * Apache DBCP is used for connection pooling functionality. + * + */ +@RequiresInstanceClassLoading +@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store", "hadoop" }) +@CapabilityDescription("Provides a Database Connection Pooling Service for Hadoop related JDBC services. This service requires that " + + "the Database Driver Location(s) contains some version of a hadoop-common JAR, or a shaded JAR that shades hadoop-common.") +@DynamicProperty(name = "The name of a Hadoop configuration property.", value = "The value of the given Hadoop configuration property.", + description = "These properties will be set on the Hadoop configuration after loading any provided configuration files.", + expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY) +public class HadoopDBCPConnectionPool extends AbstractControllerService implements DBCPService { + + private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB"; + + private static final String HADOOP_CONFIGURATION_CLASS = "org.apache.hadoop.conf.Configuration"; + private static final String HADOOP_UGI_CLASS = "org.apache.hadoop.security.UserGroupInformation"; + + private static final String DEFAULT_MIN_IDLE = "0"; + private static final String DEFAULT_MAX_IDLE = "8"; + private static final String DEFAULT_MAX_CONN_LIFETIME = "-1"; + private static final String DEFAULT_EVICTION_RUN_PERIOD = String.valueOf(-1L); + private static final String DEFAULT_MIN_EVICTABLE_IDLE_TIME = "30 mins"; + private static final String DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME = String.valueOf(-1L); + + public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder() + .name("Database Connection URL") + .description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters." + + " The exact syntax of a database connection URL is specified by your DBMS.") + .defaultValue(null) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder() + .name("Database Driver Class Name") + .description("Database driver class name") + .defaultValue(null) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor DB_DRIVER_LOCATION = new PropertyDescriptor.Builder() + .name("database-driver-locations") + .displayName("Database Driver Location(s)") + .description("Comma-separated list of files/folders and/or URLs containing the driver JAR and its dependencies (if any). " + + "For example '/var/tmp/phoenix-client.jar'. NOTE: It is required that the resources specified by this property provide " + + "the classes from hadoop-common, such as Configuration and UserGroupInformation.") + .defaultValue(null) + .required(true) + .addValidator(StandardValidators.createListValidator(true, true, StandardValidators.createURLorFileValidator())) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .dynamicallyModifiesClasspath(true) + .build(); + + static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() + .name("hadoop-config-resources") + .displayName("Hadoop Configuration Resources") + .description("A file, or comma separated list of files, which contain the Hadoop configuration (core-site.xml, etc.). Without this, Hadoop " + + "will search the classpath, or will revert to a default configuration. Note that to enable authentication with Kerberos, " + + "the appropriate properties must be set in the configuration files.") + .required(false) + .addValidator(StandardValidators.createListValidator(true, true, StandardValidators.createURLorFileValidator())) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .dynamicallyModifiesClasspath(true) + .build(); + + public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder() + .name("Database User") + .description("The user for the database") + .defaultValue(null) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("The password for the database user") + .defaultValue(null) + .required(false) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder() + .name("Max Wait Time") + .description("The maximum amount of time that the pool will wait (when there are no available connections) " + + " for a connection to be returned before failing, or -1 to wait indefinitely. ") + .defaultValue("500 millis") + .required(true) + .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .sensitive(false) + .build(); + + public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder() + .name("Max Total Connections") + .description("The maximum number of active connections that can be allocated from this pool at the same time, " + + " or negative for no limit.") + .defaultValue("8") + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .sensitive(false) + .build(); + + public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder() + .name("Validation-query") + .displayName("Validation query") + .description("Validation query used to validate connections before returning them. " + + "When connection is invalid, it get's dropped and new valid connection will be returned. " + + "Note!! Using validation might have some performance penalty.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder() + .displayName("Minimum Idle Connections") + .name("dbcp-min-idle-conns") + .description("The minimum number of connections that can remain idle in the pool, without extra ones being " + + "created, or zero to create none.") + .defaultValue(DEFAULT_MIN_IDLE) + .required(false) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder() + .displayName("Max Idle Connections") + .name("dbcp-max-idle-conns") + .description("The maximum number of connections that can remain idle in the pool, without extra ones being " + + "released, or negative for no limit.") + .defaultValue(DEFAULT_MAX_IDLE) + .required(false) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder() + .displayName("Max Connection Lifetime") + .name("dbcp-max-conn-lifetime") + .description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " + + "connection will fail the next activation, passivation or validation test. A value of zero or less " + + "means the connection has an infinite lifetime.") + .defaultValue(DEFAULT_MAX_CONN_LIFETIME) + .required(false) + .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor EVICTION_RUN_PERIOD = new PropertyDescriptor.Builder() + .displayName("Time Between Eviction Runs") + .name("dbcp-time-between-eviction-runs") + .description("The number of milliseconds to sleep between runs of the idle connection evictor thread. When " + + "non-positive, no idle connection evictor thread will be run.") + .defaultValue(DEFAULT_EVICTION_RUN_PERIOD) + .required(false) + .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder() + .displayName("Minimum Evictable Idle Time") + .name("dbcp-min-evictable-idle-time") + .description("The minimum amount of time a connection may sit idle in the pool before it is eligible for eviction.") + .defaultValue(DEFAULT_MIN_EVICTABLE_IDLE_TIME) + .required(false) + .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor SOFT_MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder() + .displayName("Soft Minimum Evictable Idle Time") + .name("dbcp-soft-min-evictable-idle-time") + .description("The minimum amount of time a connection may sit idle in the pool before it is eligible for " + + "eviction by the idle connection evictor, with the extra condition that at least a minimum number of" + + " idle connections remain in the pool. When the not-soft version of this option is set to a positive" + + " value, it is examined first by the idle connection evictor: when idle connections are visited by " + + "the evictor, idle time is first compared against it (without considering the number of idle " + + "connections in the pool) and then against this soft option, including the minimum idle connections " + + "constraint.") + .defaultValue(DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME) + .required(false) + .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + .name("kerberos-credentials-service") + .displayName("Kerberos Credentials Service") + .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos") + .identifiesControllerService(KerberosCredentialsService.class) + .required(false) + .build(); + + + private File kerberosConfigFile = null; + private KerberosProperties kerberosProperties; + private List properties; + + private volatile BasicDataSource dataSource; + private volatile UserGroupInformation ugi; + private volatile KerberosUser kerberosUser; + private volatile Boolean foundHadoopDependencies; + + // Holder of cached Configuration information so validation does not reload the same config over and over + private final AtomicReference validationResourceHolder = new AtomicReference<>(null); + + @Override + protected void init(final ControllerServiceInitializationContext context) { + kerberosConfigFile = context.getKerberosConfigurationFile(); + kerberosProperties = getKerberosProperties(kerberosConfigFile); + + final List props = new ArrayList<>(); + props.add(DATABASE_URL); + props.add(DB_DRIVERNAME); + props.add(DB_DRIVER_LOCATION); + props.add(HADOOP_CONFIGURATION_RESOURCES); + props.add(KERBEROS_CREDENTIALS_SERVICE); + props.add(kerberosProperties.getKerberosPrincipal()); + props.add(kerberosProperties.getKerberosKeytab()); + props.add(kerberosProperties.getKerberosPassword()); + props.add(DB_USER); + props.add(DB_PASSWORD); + props.add(MAX_WAIT_TIME); + props.add(MAX_TOTAL_CONNECTIONS); + props.add(VALIDATION_QUERY); + props.add(MIN_IDLE); + props.add(MAX_IDLE); + props.add(MAX_CONN_LIFETIME); + props.add(EVICTION_RUN_PERIOD); + props.add(MIN_EVICTABLE_IDLE_TIME); + props.add(SOFT_MIN_EVICTABLE_IDLE_TIME); + + properties = Collections.unmodifiableList(props); + } + + protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { + return new KerberosProperties(kerberosConfigFile); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .dynamic(true) + .build(); + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List problems = new ArrayList<>(); + + // Determine if we need to validate the presence of the required hadoop dependencies... + if (foundHadoopDependencies == null) { + final ClassLoader classLoader = this.getClass().getClassLoader(); + try { + Class.forName(HADOOP_CONFIGURATION_CLASS, true, classLoader); + Class.forName(HADOOP_UGI_CLASS, true, classLoader); + foundHadoopDependencies = true; + } catch (ClassNotFoundException cnf) { + getLogger().debug(cnf.getMessage(), cnf); + foundHadoopDependencies = false; + } + } + + // If hadoop classes are missing then we can't perform the rest of the validation, so short circuit and return... + if (!foundHadoopDependencies) { + problems.add(new ValidationResult.Builder() + .subject(DB_DRIVER_LOCATION.getDisplayName()) + .valid(false) + .explanation("required Hadoop classes were not found in any of the specified resources, please ensure that hadoop-common is available") + .build()); + return problems; + } + + // Hadoop classes were found, so proceed with the rest of validation... + + final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue(); + final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + final String resolvedPrincipal; + final String resolvedKeytab; + if (credentialsService == null) { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; + } else { + resolvedPrincipal = credentialsService.getPrincipal(); + resolvedKeytab = credentialsService.getKeytab(); + } + + final boolean confFileProvided = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet(); + if (confFileProvided) { + final String configFiles = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); + ValidationResources resources = validationResourceHolder.get(); + + // if no resources in the holder, or if the holder has different resources loaded, + // then load the Configuration and set the new resources in the holder + if (resources == null || !configFiles.equals(resources.getConfigResources())) { + getLogger().debug("Reloading validation resources"); + resources = new ValidationResources(configFiles, getConfigurationFromFiles(configFiles)); + validationResourceHolder.set(resources); + } + + final Configuration hadoopConfig = resources.getConfiguration(); + + problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(getClass().getSimpleName(), hadoopConfig, + resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger())); + } + + if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) { + problems.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("Cannot specify a Kerberos Credentials Service while also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password") + .build()); + } + + if (!isAllowExplicitKeytab() && explicitKeytab != null) { + problems.add(new ValidationResult.Builder() + .subject("Kerberos Credentials") + .valid(false) + .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring Kerberos Keytab in processors. " + + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.") + .build()); + } + + return problems; + } + + protected Configuration getConfigurationFromFiles(final String configFiles) { + final Configuration conf = new Configuration(); + if (StringUtils.isNotBlank(configFiles)) { + for (final String configFile : configFiles.split(",")) { + conf.addResource(new Path(configFile.trim())); + } + } + return conf; + } + + /** + * Configures connection pool by creating an instance of the + * {@link BasicDataSource} based on configuration provided with + * {@link ConfigurationContext}. + * + * This operation makes no guarantees that the actual connection could be + * made since the underlying system may still go off-line during normal + * operation of the connection pool. + * + * @param context + * the configuration context + * @throws InitializationException + * if unable to create a database connection + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws IOException { + // Get Configuration instance from specified resources + final String configFiles = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(); + final Configuration hadoopConfig = getConfigurationFromFiles(configFiles); + + // Add any dynamic properties to the HBase Configuration + for (final Map.Entry entry : context.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.isDynamic()) { + hadoopConfig.set(descriptor.getName(), context.getProperty(descriptor).evaluateAttributeExpressions().getValue()); + } + } + + // If security is enabled then determine how to authenticate based on the various principal/keytab/password options + if (SecurityUtil.isSecurityEnabled(hadoopConfig)) { + final String explicitPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue(); + final String explicitKeytab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue(); + final String explicitPassword = context.getProperty(kerberosProperties.getKerberosPassword()).getValue(); + final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + final String resolvedPrincipal; + final String resolvedKeytab; + if (credentialsService != null) { + resolvedPrincipal = credentialsService.getPrincipal(); + resolvedKeytab = credentialsService.getKeytab(); + } else { + resolvedPrincipal = explicitPrincipal; + resolvedKeytab = explicitKeytab; + } + + if (resolvedKeytab != null) { + kerberosUser = new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab); + getLogger().info("Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab}); + } else if (explicitPassword != null) { + kerberosUser = new KerberosPasswordUser(resolvedPrincipal, explicitPassword); + getLogger().info("Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal}); + } else { + throw new IOException("Unable to authenticate with Kerberos, no keytab or password was provided"); + } + + ugi = SecurityUtil.getUgiForKerberosUser(hadoopConfig, kerberosUser); + getLogger().info("Successfully logged in as principal " + resolvedPrincipal); + } else { + getLogger().info("Simple Authentication"); + } + + // Initialize the DataSource... + final String dbUrl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue(); + final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue(); + final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue(); + final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue(); + final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger(); + final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue(); + final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions()); + final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger(); + final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger(); + final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions()); + final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions()); + final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions()); + final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions()); + + dataSource = new BasicDataSource(); + dataSource.setDriverClassName(driverName); + dataSource.setDriverClassLoader(this.getClass().getClassLoader()); + dataSource.setUrl(dbUrl); + dataSource.setUsername(user); + dataSource.setPassword(passw); + dataSource.setMaxWaitMillis(maxWaitMillis); + dataSource.setMaxTotal(maxTotal); + dataSource.setMinIdle(minIdle); + dataSource.setMaxIdle(maxIdle); + dataSource.setMaxConnLifetimeMillis(maxConnLifetimeMillis); + dataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); + dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); + dataSource.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis); + + if (StringUtils.isEmpty(validationQuery)) { + dataSource.setValidationQuery(validationQuery); + dataSource.setTestOnBorrow(true); + } + } + + private Long extractMillisWithInfinite(PropertyValue prop) { + return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS); + } + + /** + * Shutdown pool, close all open connections. + * If a principal is authenticated with a KDC, that principal is logged out. + * + * If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser}, + * an attempt will still be made to shut down the pool and close open connections. + * + * @throws SQLException if there is an error while closing open connections + * @throws LoginException if there is an error during the principal log out, and will only be thrown if there was + * no exception while closing open connections + */ + @OnDisabled + public void shutdown() throws SQLException, LoginException { + try { + if (kerberosUser != null) { + kerberosUser.logout(); + } + } finally { + validationResourceHolder.set(null); + foundHadoopDependencies = null; + kerberosUser = null; + ugi = null; + try { + if (dataSource != null) { + dataSource.close(); + } + } finally { + dataSource = null; + } + } + } + + @Override + public Connection getConnection() throws ProcessException { + try { + if (ugi != null) { + // Explicitly check the TGT and relogin if necessary with the KerberosUser instance. No synchronization + // is necessary in the client code, since AbstractKerberosUser's checkTGTAndRelogin method is synchronized. + getLogger().trace("getting UGI instance"); + if (kerberosUser != null) { + // if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring + getLogger().debug("kerberosUser is " + kerberosUser); + try { + getLogger().debug("checking TGT on kerberosUser " + kerberosUser); + kerberosUser.checkTGTAndRelogin(); + } catch (LoginException e) { + throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e); + } + } else { + getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser"); + // no synchronization is needed for UserGroupInformation.checkTGTAndReloginFromKeytab; UGI handles the synchronization internally + ugi.checkTGTAndReloginFromKeytab(); + } + + try { + return ugi.doAs((PrivilegedExceptionAction) () -> dataSource.getConnection()); + } catch (UndeclaredThrowableException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } else { + throw e; + } + } + } else { + getLogger().info("Simple Authentication"); + return dataSource.getConnection(); + } + } catch (SQLException | IOException | InterruptedException e) { + getLogger().error("Error getting Connection: " + e.getMessage(), e); + throw new ProcessException(e); + } + } + + @Override + public String toString() { + return "HadoopDBCPConnectionPool[id=" + getIdentifier() + "]"; + } + + /* + * Overridable by subclasses in the same package, mainly intended for testing purposes to allow verification without having to set environment variables. + */ + boolean isAllowExplicitKeytab() { + return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB)); + } + + BasicDataSource getDataSource() { + return dataSource; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/ValidationResources.java b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/ValidationResources.java new file mode 100644 index 0000000000..0ec0ed82fd --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/java/org/apache/nifi/dbcp/ValidationResources.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.dbcp; + +import org.apache.hadoop.conf.Configuration; + +public class ValidationResources { + + private final String configResources; + private final Configuration configuration; + + public ValidationResources(String configResources, Configuration configuration) { + this.configResources = configResources; + this.configuration = configuration; + } + + public String getConfigResources() { + return configResources; + } + + public Configuration getConfiguration() { + return configuration; + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..190177046c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.dbcp.HadoopDBCPConnectionPool \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java new file mode 100644 index 0000000000..644c66995f --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/HadoopDBCPConnectionPoolTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.dbcp; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.kerberos.KerberosContext; +import org.apache.nifi.kerberos.KerberosCredentialsService; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockKerberosContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; + +public class HadoopDBCPConnectionPoolTest { + + private File krbConfFile; + private KerberosProperties kerberosProps; + private KerberosContext kerberosContext; + + @Before + public void setup() { + krbConfFile = new File("src/test/resources/krb5.conf"); + kerberosProps = new KerberosProperties(krbConfFile); + kerberosContext = new MockKerberosContext(krbConfFile); + } + + @Test + public void testCustomValidateWhenAllowExplicitKeytab() throws InitializationException { + final Processor testProcessor = new TestProcessor(); + final TestRunner runner = TestRunners.newTestRunner(testProcessor, kerberosContext); + + // Configure minimum required properties.. + final HadoopDBCPConnectionPool hadoopDBCPService = new TestableHadoopDBCPConnectionPool(true); + runner.addControllerService("hadoop-dbcp-service", hadoopDBCPService); + runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DATABASE_URL, "jdbc:phoenix:zk-host1,zk-host2:2181:/hbase"); + runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVERNAME, "org.apache.phoenix.jdbc.PhoenixDriver"); + runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVER_LOCATION, "target"); + + // Security is not enabled yet since no conf files provided, so should be valid + runner.assertValid(hadoopDBCPService); + + // Enable security, should be invalid until some form of kerberos credentials are provided + runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site-security.xml"); + runner.assertNotValid(hadoopDBCPService); + + // Configure principal and keytab, should be valid + runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosPrincipal(), "nifi@EXAMPLE.COM"); + runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosKeytab(), "src/test/resources/fake.keytab"); + runner.assertValid(hadoopDBCPService); + + // Configure password, should become invalid + runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosPassword(), "password"); + runner.assertNotValid(hadoopDBCPService); + + // Remove keytab property, should become valid + runner.removeProperty(hadoopDBCPService, kerberosProps.getKerberosKeytab()); + runner.assertValid(hadoopDBCPService); + + // Configure a KeberosCredentialService, should become invalid + final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService( + "nifi@EXAMPLE.COM", "src/test/resources/fake.keytab"); + runner.addControllerService("kerb-credentials", kerberosCredentialsService); + runner.enableControllerService(kerberosCredentialsService); + runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE, "kerb-credentials"); + runner.assertNotValid(hadoopDBCPService); + + // Remove password property, still invalid + runner.removeProperty(hadoopDBCPService, kerberosProps.getKerberosPassword()); + runner.assertNotValid(hadoopDBCPService); + + // Remove principal property, only using keytab service, should become valid + runner.removeProperty(hadoopDBCPService, kerberosProps.getKerberosPrincipal()); + runner.assertValid(hadoopDBCPService); + } + + @Test + public void testCustomValidateWhenNotAllowExplicitKeytab() throws InitializationException { + final Processor testProcessor = new TestProcessor(); + final TestRunner runner = TestRunners.newTestRunner(testProcessor, kerberosContext); + + // Configure minimum required properties.. + final HadoopDBCPConnectionPool hadoopDBCPService = new TestableHadoopDBCPConnectionPool(false); + runner.addControllerService("hadoop-dbcp-service", hadoopDBCPService); + runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DATABASE_URL, "jdbc:phoenix:zk-host1,zk-host2:2181:/hbase"); + runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVERNAME, "org.apache.phoenix.jdbc.PhoenixDriver"); + runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVER_LOCATION, "target"); + + // Security is not enabled yet since no conf files provided, so should be valid + runner.assertValid(hadoopDBCPService); + + // Enable security, should be invalid until some form of kerberos credentials are provided + runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site-security.xml"); + runner.assertNotValid(hadoopDBCPService); + + // Configure principal and keytab, should be valid + runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosPrincipal(), "nifi@EXAMPLE.COM"); + runner.assertNotValid(hadoopDBCPService); + } + + private static final class TestableHadoopDBCPConnectionPool extends HadoopDBCPConnectionPool { + + private final boolean allowExplicitKeytab; + + public TestableHadoopDBCPConnectionPool(boolean allowExplicitKeytab) { + this.allowExplicitKeytab = allowExplicitKeytab; + } + + @Override + boolean isAllowExplicitKeytab() { + return allowExplicitKeytab; + } + } + + private class MockKerberosCredentialsService extends AbstractControllerService implements KerberosCredentialsService { + + private String principal; + private String keytab; + + public MockKerberosCredentialsService(String principal, String keytab) { + this.principal = principal; + this.keytab = keytab; + } + + @Override + public String getKeytab() { + return keytab; + } + + @Override + public String getPrincipal() { + return principal; + } + } + +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestProcessor.java b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestProcessor.java new file mode 100644 index 0000000000..cadeee9281 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestProcessor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.dbcp; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +public class TestProcessor extends AbstractProcessor { + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + + @Override + protected List getSupportedPropertyDescriptors() { + List propDescs = new ArrayList<>(); + propDescs.add(new PropertyDescriptor.Builder() + .name("DBCPService test processor") + .description("DBCPService test processor") + .identifiesControllerService(DBCPService.class) + .required(true) + .build()); + return propDescs; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/resources/core-site-security.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/resources/core-site-security.xml new file mode 100644 index 0000000000..2aca105f9b --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/resources/core-site-security.xml @@ -0,0 +1,30 @@ + + + + + + fs.default.name + hdfs://hbase + + + hadoop.security.authentication + kerberos + + + hadoop.security.authorization + true + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/resources/core-site.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/resources/core-site.xml new file mode 100644 index 0000000000..c044ee30da --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/resources/core-site.xml @@ -0,0 +1,22 @@ + + + + + + fs.default.name + hdfs://hbase + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/resources/fake.keytab b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/resources/fake.keytab new file mode 100644 index 0000000000..e69de29bb2 diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/resources/krb5.conf b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/resources/krb5.conf new file mode 100644 index 0000000000..814d5b2fb8 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/nifi-hadoop-dbcp-service/src/test/resources/krb5.conf @@ -0,0 +1,12 @@ +[libdefaults] + default_realm = EXAMPLE.COM + +[realms] + EXAMPLE.COM = { + kdc = kdc1.example.com + kdc = kdc2.example.com + admin_server = kdc1.example.com + } + +[domain_realm] + .example.com = EXAMPLE.COM \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/pom.xml new file mode 100644 index 0000000000..dd7e26b4cb --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hadoop-dbcp-service-bundle/pom.xml @@ -0,0 +1,29 @@ + + + 4.0.0 + + org.apache.nifi + nifi-standard-services + 1.12.0-SNAPSHOT + + + nifi-hadoop-dbcp-service-bundle + pom + + nifi-hadoop-dbcp-service + nifi-hadoop-dbcp-service-nar + + diff --git a/nifi-nar-bundles/nifi-standard-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/pom.xml index 2ee9228bdc..aecf01ef31 100644 --- a/nifi-nar-bundles/nifi-standard-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/pom.xml @@ -49,5 +49,6 @@ nifi-rules-engine-service-api nifi-record-sink-api nifi-record-sink-service-bundle + nifi-hadoop-dbcp-service-bundle