NIFI-7257 Added HadoopDBCPConnectionPool

- Updated InstanceClassLoader to resolve files that are in the instance urls or additional urls
- Updated nifi-mock to support KerberosContext and removeProperty for ControllerServices
- Added unit test for HadoopDBCPConnectionPool
- Addressing review feedback

This closes #4149.
This commit is contained in:
Bryan Bende 2020-03-12 15:57:11 -04:00
parent 7f32aa56db
commit 6412097eb2
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
22 changed files with 1605 additions and 40 deletions

View File

@ -277,6 +277,12 @@ language governing permissions and limitations under the License. -->
<version>1.12.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-dbcp-service-nar</artifactId>
<version>1.12.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mongodb-client-service-api-nar</artifactId>

View File

@ -16,15 +16,17 @@
*/
package org.apache.nifi.util;
import java.io.File;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.kerberos.KerberosContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.state.MockStateManager;
import java.io.File;
public class MockControllerServiceInitializationContext extends MockControllerServiceLookup implements ControllerServiceInitializationContext, ControllerServiceLookup, NodeTypeProvider {
private final String identifier;
@ -32,6 +34,7 @@ public class MockControllerServiceInitializationContext extends MockControllerSe
private final StateManager stateManager;
private volatile boolean isClustered;
private volatile boolean isPrimaryNode;
private final KerberosContext kerberosContext;
public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier) {
this(controllerService, identifier, new MockStateManager(controllerService));
@ -42,9 +45,15 @@ public class MockControllerServiceInitializationContext extends MockControllerSe
}
public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier, final ComponentLog logger, final StateManager stateManager) {
this(controllerService, identifier, logger, stateManager, null);
}
public MockControllerServiceInitializationContext(final ControllerService controllerService, final String identifier, final ComponentLog logger, final StateManager stateManager,
final KerberosContext kerberosContext) {
this.identifier = identifier;
this.logger = logger;
this.stateManager = stateManager;
this.kerberosContext = kerberosContext;
addControllerService(controllerService, identifier);
}
@ -80,17 +89,17 @@ public class MockControllerServiceInitializationContext extends MockControllerSe
@Override
public String getKerberosServicePrincipal() {
return null; //this needs to be wired in.
return kerberosContext != null ? kerberosContext.getKerberosServicePrincipal() : null;
}
@Override
public File getKerberosServiceKeytab() {
return null; //this needs to be wired in.
return kerberosContext != null ? kerberosContext.getKerberosServiceKeytab() : null;
}
@Override
public File getKerberosConfigurationFile() {
return null; //this needs to be wired in.
return kerberosContext != null ? kerberosContext.getKerberosConfigurationFile() : null;
}
@Override

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import org.apache.nifi.kerberos.KerberosContext;
import java.io.File;
public class MockKerberosContext implements KerberosContext {
private final String kerberosServicePrincipal;
private final File kerberosServiceKeytab;
private final File kerberosConfigFile;
public MockKerberosContext(final File kerberosConfigFile) {
this(null, null, kerberosConfigFile);
}
public MockKerberosContext(final String kerberosServicePrincipal, final File kerberosServiceKeytab, final File kerberosConfigFile) {
this.kerberosServicePrincipal = kerberosServicePrincipal;
this.kerberosServiceKeytab = kerberosServiceKeytab;
this.kerberosConfigFile = kerberosConfigFile;
}
@Override
public String getKerberosServicePrincipal() {
return kerberosServicePrincipal;
}
@Override
public File getKerberosServiceKeytab() {
return kerberosServiceKeytab;
}
@Override
public File getKerberosConfigurationFile() {
return kerberosConfigFile;
}
}

View File

@ -16,34 +16,6 @@
*/
package org.apache.nifi.util;
import static java.util.Objects.requireNonNull;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@ -74,6 +46,34 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager;
import org.junit.Assert;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import static java.util.Objects.requireNonNull;
public class StandardProcessorTestRunner implements TestRunner {
private final Processor processor;
@ -618,7 +618,8 @@ public class StandardProcessorTestRunner implements TestRunner {
final MockComponentLog logger = new MockComponentLog(identifier, service);
controllerServiceLoggers.put(identifier, logger);
final MockStateManager serviceStateManager = new MockStateManager(service);
final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier), logger, serviceStateManager);
final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(
requireNonNull(service), requireNonNull(identifier), logger, serviceStateManager, kerberosContext);
controllerServiceStateManagers.put(identifier, serviceStateManager);
initContext.addControllerServices(context);
service.initialize(initContext);
@ -839,6 +840,36 @@ public class StandardProcessorTestRunner implements TestRunner {
return context.removeProperty(property);
}
@Override
public boolean removeProperty(final ControllerService service, final PropertyDescriptor property) {
final MockStateManager serviceStateManager = controllerServiceStateManagers.get(service.getIdentifier());
if (serviceStateManager == null) {
throw new IllegalStateException("Controller service " + service + " has not been added to this TestRunner via the #addControllerService method");
}
final ControllerServiceConfiguration configuration = getConfigToUpdate(service);
final Map<PropertyDescriptor, String> curProps = configuration.getProperties();
final Map<PropertyDescriptor, String> updatedProps = new HashMap<>(curProps);
final String oldValue = updatedProps.remove(property);
if (oldValue == null) {
return false;
}
configuration.setProperties(updatedProps);
service.onPropertyModified(property, oldValue, null);
return true;
}
@Override
public boolean removeProperty(ControllerService service, String propertyName) {
final PropertyDescriptor descriptor = service.getPropertyDescriptor(propertyName);
if (descriptor == null) {
return false;
}
return removeProperty(service, descriptor);
}
@Override
public void clearProperties() {
context.clearProperties();

View File

@ -16,13 +16,6 @@
*/
package org.apache.nifi.util;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
@ -38,6 +31,13 @@ import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
public interface TestRunner {
/**
@ -855,6 +855,16 @@ public interface TestRunner {
*/
boolean removeProperty(PropertyDescriptor descriptor);
/**
* Removes the {@link PropertyDescriptor} from the ControllerService,
* effectively setting its value to null, or the property's default value, if it has one.
*
* @param controllerService the controller service to remove the property from
* @param descriptor of property to remove
* @return <code>true</code> if removed, <code>false</code> if the property was not set
*/
boolean removeProperty(ControllerService controllerService, PropertyDescriptor descriptor);
/**
* Removes the property from the {@link ProcessContext},
* effectively setting its value to null, or the property's default value, if it has one.
@ -864,6 +874,16 @@ public interface TestRunner {
*/
boolean removeProperty(String property);
/**
* Removes the {@link PropertyDescriptor} from the ControllerService,
* effectively setting its value to null, or the property's default value, if it has one.
*
* @param controllerService the controller service to remove the property from
* @param property name of the property to remove
* @return <code>true</code> if removed, <code>false</code> if the property was not set
*/
boolean removeProperty(ControllerService controllerService, String property);
/**
* Clears all set properties from the {@link ProcessContext}.
*/

View File

@ -20,6 +20,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
@ -72,6 +73,48 @@ public class InstanceClassLoader extends AbstractNativeLibHandlingClassLoader {
additionalResourceUrls == null ? Collections.emptySet() : new LinkedHashSet<>(additionalResourceUrls));
}
/**
* Note: Normally URLClassLoader will only load resources that are inside JARs, or in directories, but many times we allow
* properties to specify specific files to add to the classpath. This allows those files to be found by checking the known
* URLs of the InstanceClassLoader, when the resource wasn't find in the parent hierarchy.
*/
@Override
public URL findResource(String name) {
URL resourceUrl = super.findResource(name);
if (resourceUrl == null) {
resourceUrl = findResource(instanceUrls, name);
}
if (resourceUrl == null) {
resourceUrl = findResource(additionalResourceUrls, name);
}
return resourceUrl;
}
private URL findResource(final Set<URL> urls, final String name) {
if (urls == null || name == null) {
return null;
}
for (final URL url : urls) {
try {
final URI uri = url.toURI();
final File file = new File(uri);
if (name.equals(file.getName())) {
logger.debug("Found resource '" + name + "' from URL '" + url.toExternalForm() + "'");
return url;
}
} catch (URISyntaxException e) {
logger.error(e.getMessage(), e);
return null;
}
}
return null;
}
private static List<File> initNativeLibDirList(Set<File> narNativeLibDirs, Set<URL> additionalResourceUrls) {
List<File> nativeLibDirList = new ArrayList<>(narNativeLibDirs);

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.nar;
import org.junit.Test;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertEquals;
public class TestInstanceClassLoader {
@Test
public void testFindResourceWhenFile() throws MalformedURLException {
final File nifiProperties = new File("src/test/resources/nifi.properties");
assertTrue(nifiProperties.exists());
final URL nifiPropertiesURL = nifiProperties.toURI().toURL();
final ClassLoader instanceClassLoader = new InstanceClassLoader(
"id",
"org.apache.nifi.processors.MyProcessor",
Collections.emptySet(),
Collections.singleton(nifiPropertiesURL),
null);
final URL nifiPropertiesResource = instanceClassLoader.getResource(nifiProperties.getName());
assertNotNull(nifiPropertiesResource);
assertEquals(nifiPropertiesURL.toExternalForm(), nifiPropertiesResource.toExternalForm());
final URL doesNotExistResource = instanceClassLoader.getResource("does-not-exist.txt");
assertNull(doesNotExistResource);
}
}

View File

@ -0,0 +1,42 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-dbcp-service-bundle</artifactId>
<version>1.12.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-hadoop-dbcp-service-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>1.12.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-dbcp-service</artifactId>
<version>1.12.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,41 @@
nifi-hadoop-dbcp-service-nar
Copyright 2014-2020 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
===========================================
Apache Software License v2
===========================================
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2015 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Apache Commons DBCP
The following NOTICE information applies:
Apache Commons DBCP
Copyright 2001-2018 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
(ASLv2) Apache Commons Pool
The following NOTICE information applies:
Apache Commons Pool
Copyright 2001-2018 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
(ASLv2) Apache Derby
The following NOTICE information applies:
Apache Derby
Copyright 2004-2014 Apache, Apache DB, Apache Derby, Apache Torque, Apache JDO, Apache DDLUtils,
the Derby hat logo, the Apache JDO logo, and the Apache feather logo are trademarks of The Apache Software Foundation.

View File

@ -0,0 +1,110 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-dbcp-service-bundle</artifactId>
<version>1.12.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-hadoop-dbcp-service</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>1.12.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.12.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-service-utils</artifactId>
<version>1.12.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<version>1.12.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-utils</artifactId>
<version>1.12.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
<version>1.12.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.7.0</version>
</dependency>
<!-- We are compiling against hadoop-common to make use of UserGroupInformation, but we don't want to bundle it
into the NAR, users are expected to provided hadoop-common on the driver classpath, or a shaded JAR that includes it -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.12.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>1.12.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/fake.keytab</exclude>
<exclude>src/test/resources/krb5.conf</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,607 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import javax.security.auth.login.LoginException;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Implementation of Database Connection Pooling Service for Hadoop related JDBC Service.
* Apache DBCP is used for connection pooling functionality.
*
*/
@RequiresInstanceClassLoading
@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store", "hadoop" })
@CapabilityDescription("Provides a Database Connection Pooling Service for Hadoop related JDBC services. This service requires that " +
"the Database Driver Location(s) contains some version of a hadoop-common JAR, or a shaded JAR that shades hadoop-common.")
@DynamicProperty(name = "The name of a Hadoop configuration property.", value = "The value of the given Hadoop configuration property.",
description = "These properties will be set on the Hadoop configuration after loading any provided configuration files.",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
public class HadoopDBCPConnectionPool extends AbstractControllerService implements DBCPService {
private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
private static final String HADOOP_CONFIGURATION_CLASS = "org.apache.hadoop.conf.Configuration";
private static final String HADOOP_UGI_CLASS = "org.apache.hadoop.security.UserGroupInformation";
private static final String DEFAULT_MIN_IDLE = "0";
private static final String DEFAULT_MAX_IDLE = "8";
private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
private static final String DEFAULT_EVICTION_RUN_PERIOD = String.valueOf(-1L);
private static final String DEFAULT_MIN_EVICTABLE_IDLE_TIME = "30 mins";
private static final String DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME = String.valueOf(-1L);
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("Database Connection URL")
.description("A database connection URL used to connect to a database. May contain database system name, host, port, database name and some parameters."
+ " The exact syntax of a database connection URL is specified by your DBMS.")
.defaultValue(null)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor DB_DRIVERNAME = new PropertyDescriptor.Builder()
.name("Database Driver Class Name")
.description("Database driver class name")
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor DB_DRIVER_LOCATION = new PropertyDescriptor.Builder()
.name("database-driver-locations")
.displayName("Database Driver Location(s)")
.description("Comma-separated list of files/folders and/or URLs containing the driver JAR and its dependencies (if any). " +
"For example '/var/tmp/phoenix-client.jar'. NOTE: It is required that the resources specified by this property provide " +
"the classes from hadoop-common, such as Configuration and UserGroupInformation.")
.defaultValue(null)
.required(true)
.addValidator(StandardValidators.createListValidator(true, true, StandardValidators.createURLorFileValidator()))
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dynamicallyModifiesClasspath(true)
.build();
static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder()
.name("hadoop-config-resources")
.displayName("Hadoop Configuration Resources")
.description("A file, or comma separated list of files, which contain the Hadoop configuration (core-site.xml, etc.). Without this, Hadoop "
+ "will search the classpath, or will revert to a default configuration. Note that to enable authentication with Kerberos, "
+ "the appropriate properties must be set in the configuration files.")
.required(false)
.addValidator(StandardValidators.createListValidator(true, true, StandardValidators.createURLorFileValidator()))
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dynamicallyModifiesClasspath(true)
.build();
public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder()
.name("Database User")
.description("The user for the database")
.defaultValue(null)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor DB_PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("The password for the database user")
.defaultValue(null)
.required(false)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_WAIT_TIME = new PropertyDescriptor.Builder()
.name("Max Wait Time")
.description("The maximum amount of time that the pool will wait (when there are no available connections) "
+ " for a connection to be returned before failing, or -1 to wait indefinitely. ")
.defaultValue("500 millis")
.required(true)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.sensitive(false)
.build();
public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder()
.name("Max Total Connections")
.description("The maximum number of active connections that can be allocated from this pool at the same time, "
+ " or negative for no limit.")
.defaultValue("8")
.required(true)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.sensitive(false)
.build();
public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
.name("Validation-query")
.displayName("Validation query")
.description("Validation query used to validate connections before returning them. "
+ "When connection is invalid, it get's dropped and new valid connection will be returned. "
+ "Note!! Using validation might have some performance penalty.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder()
.displayName("Minimum Idle Connections")
.name("dbcp-min-idle-conns")
.description("The minimum number of connections that can remain idle in the pool, without extra ones being " +
"created, or zero to create none.")
.defaultValue(DEFAULT_MIN_IDLE)
.required(false)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder()
.displayName("Max Idle Connections")
.name("dbcp-max-idle-conns")
.description("The maximum number of connections that can remain idle in the pool, without extra ones being " +
"released, or negative for no limit.")
.defaultValue(DEFAULT_MAX_IDLE)
.required(false)
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
.displayName("Max Connection Lifetime")
.name("dbcp-max-conn-lifetime")
.description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " +
"connection will fail the next activation, passivation or validation test. A value of zero or less " +
"means the connection has an infinite lifetime.")
.defaultValue(DEFAULT_MAX_CONN_LIFETIME)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor EVICTION_RUN_PERIOD = new PropertyDescriptor.Builder()
.displayName("Time Between Eviction Runs")
.name("dbcp-time-between-eviction-runs")
.description("The number of milliseconds to sleep between runs of the idle connection evictor thread. When " +
"non-positive, no idle connection evictor thread will be run.")
.defaultValue(DEFAULT_EVICTION_RUN_PERIOD)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
.displayName("Minimum Evictable Idle Time")
.name("dbcp-min-evictable-idle-time")
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for eviction.")
.defaultValue(DEFAULT_MIN_EVICTABLE_IDLE_TIME)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor SOFT_MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder()
.displayName("Soft Minimum Evictable Idle Time")
.name("dbcp-soft-min-evictable-idle-time")
.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for " +
"eviction by the idle connection evictor, with the extra condition that at least a minimum number of" +
" idle connections remain in the pool. When the not-soft version of this option is set to a positive" +
" value, it is examined first by the idle connection evictor: when idle connections are visited by " +
"the evictor, idle time is first compared against it (without considering the number of idle " +
"connections in the pool) and then against this soft option, including the minimum idle connections " +
"constraint.")
.defaultValue(DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME)
.required(false)
.addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosCredentialsService.class)
.required(false)
.build();
private File kerberosConfigFile = null;
private KerberosProperties kerberosProperties;
private List<PropertyDescriptor> properties;
private volatile BasicDataSource dataSource;
private volatile UserGroupInformation ugi;
private volatile KerberosUser kerberosUser;
private volatile Boolean foundHadoopDependencies;
// Holder of cached Configuration information so validation does not reload the same config over and over
private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>(null);
@Override
protected void init(final ControllerServiceInitializationContext context) {
kerberosConfigFile = context.getKerberosConfigurationFile();
kerberosProperties = getKerberosProperties(kerberosConfigFile);
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(DATABASE_URL);
props.add(DB_DRIVERNAME);
props.add(DB_DRIVER_LOCATION);
props.add(HADOOP_CONFIGURATION_RESOURCES);
props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(kerberosProperties.getKerberosPrincipal());
props.add(kerberosProperties.getKerberosKeytab());
props.add(kerberosProperties.getKerberosPassword());
props.add(DB_USER);
props.add(DB_PASSWORD);
props.add(MAX_WAIT_TIME);
props.add(MAX_TOTAL_CONNECTIONS);
props.add(VALIDATION_QUERY);
props.add(MIN_IDLE);
props.add(MAX_IDLE);
props.add(MAX_CONN_LIFETIME);
props.add(EVICTION_RUN_PERIOD);
props.add(MIN_EVICTABLE_IDLE_TIME);
props.add(SOFT_MIN_EVICTABLE_IDLE_TIME);
properties = Collections.unmodifiableList(props);
}
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
return new KerberosProperties(kerberosConfigFile);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dynamic(true)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>();
// Determine if we need to validate the presence of the required hadoop dependencies...
if (foundHadoopDependencies == null) {
final ClassLoader classLoader = this.getClass().getClassLoader();
try {
Class.forName(HADOOP_CONFIGURATION_CLASS, true, classLoader);
Class.forName(HADOOP_UGI_CLASS, true, classLoader);
foundHadoopDependencies = true;
} catch (ClassNotFoundException cnf) {
getLogger().debug(cnf.getMessage(), cnf);
foundHadoopDependencies = false;
}
}
// If hadoop classes are missing then we can't perform the rest of the validation, so short circuit and return...
if (!foundHadoopDependencies) {
problems.add(new ValidationResult.Builder()
.subject(DB_DRIVER_LOCATION.getDisplayName())
.valid(false)
.explanation("required Hadoop classes were not found in any of the specified resources, please ensure that hadoop-common is available")
.build());
return problems;
}
// Hadoop classes were found, so proceed with the rest of validation...
final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String resolvedPrincipal;
final String resolvedKeytab;
if (credentialsService == null) {
resolvedPrincipal = explicitPrincipal;
resolvedKeytab = explicitKeytab;
} else {
resolvedPrincipal = credentialsService.getPrincipal();
resolvedKeytab = credentialsService.getKeytab();
}
final boolean confFileProvided = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).isSet();
if (confFileProvided) {
final String configFiles = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
ValidationResources resources = validationResourceHolder.get();
// if no resources in the holder, or if the holder has different resources loaded,
// then load the Configuration and set the new resources in the holder
if (resources == null || !configFiles.equals(resources.getConfigResources())) {
getLogger().debug("Reloading validation resources");
resources = new ValidationResources(configFiles, getConfigurationFromFiles(configFiles));
validationResourceHolder.set(resources);
}
final Configuration hadoopConfig = resources.getConfiguration();
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(getClass().getSimpleName(), hadoopConfig,
resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
}
if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) {
problems.add(new ValidationResult.Builder()
.subject("Kerberos Credentials")
.valid(false)
.explanation("Cannot specify a Kerberos Credentials Service while also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password")
.build());
}
if (!isAllowExplicitKeytab() && explicitKeytab != null) {
problems.add(new ValidationResult.Builder()
.subject("Kerberos Credentials")
.valid(false)
.explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring Kerberos Keytab in processors. "
+ "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.")
.build());
}
return problems;
}
protected Configuration getConfigurationFromFiles(final String configFiles) {
final Configuration conf = new Configuration();
if (StringUtils.isNotBlank(configFiles)) {
for (final String configFile : configFiles.split(",")) {
conf.addResource(new Path(configFile.trim()));
}
}
return conf;
}
/**
* Configures connection pool by creating an instance of the
* {@link BasicDataSource} based on configuration provided with
* {@link ConfigurationContext}.
*
* This operation makes no guarantees that the actual connection could be
* made since the underlying system may still go off-line during normal
* operation of the connection pool.
*
* @param context
* the configuration context
* @throws InitializationException
* if unable to create a database connection
*/
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws IOException {
// Get Configuration instance from specified resources
final String configFiles = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
final Configuration hadoopConfig = getConfigurationFromFiles(configFiles);
// Add any dynamic properties to the HBase Configuration
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamic()) {
hadoopConfig.set(descriptor.getName(), context.getProperty(descriptor).evaluateAttributeExpressions().getValue());
}
}
// If security is enabled then determine how to authenticate based on the various principal/keytab/password options
if (SecurityUtil.isSecurityEnabled(hadoopConfig)) {
final String explicitPrincipal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
final String explicitKeytab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
final String explicitPassword = context.getProperty(kerberosProperties.getKerberosPassword()).getValue();
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String resolvedPrincipal;
final String resolvedKeytab;
if (credentialsService != null) {
resolvedPrincipal = credentialsService.getPrincipal();
resolvedKeytab = credentialsService.getKeytab();
} else {
resolvedPrincipal = explicitPrincipal;
resolvedKeytab = explicitKeytab;
}
if (resolvedKeytab != null) {
kerberosUser = new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab);
getLogger().info("Security Enabled, logging in as principal {} with keytab {}", new Object[] {resolvedPrincipal, resolvedKeytab});
} else if (explicitPassword != null) {
kerberosUser = new KerberosPasswordUser(resolvedPrincipal, explicitPassword);
getLogger().info("Security Enabled, logging in as principal {} with password", new Object[] {resolvedPrincipal});
} else {
throw new IOException("Unable to authenticate with Kerberos, no keytab or password was provided");
}
ugi = SecurityUtil.getUgiForKerberosUser(hadoopConfig, kerberosUser);
getLogger().info("Successfully logged in as principal " + resolvedPrincipal);
} else {
getLogger().info("Simple Authentication");
}
// Initialize the DataSource...
final String dbUrl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
final String driverName = context.getProperty(DB_DRIVERNAME).evaluateAttributeExpressions().getValue();
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger();
final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions());
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
dataSource = new BasicDataSource();
dataSource.setDriverClassName(driverName);
dataSource.setDriverClassLoader(this.getClass().getClassLoader());
dataSource.setUrl(dbUrl);
dataSource.setUsername(user);
dataSource.setPassword(passw);
dataSource.setMaxWaitMillis(maxWaitMillis);
dataSource.setMaxTotal(maxTotal);
dataSource.setMinIdle(minIdle);
dataSource.setMaxIdle(maxIdle);
dataSource.setMaxConnLifetimeMillis(maxConnLifetimeMillis);
dataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
dataSource.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis);
if (StringUtils.isEmpty(validationQuery)) {
dataSource.setValidationQuery(validationQuery);
dataSource.setTestOnBorrow(true);
}
}
private Long extractMillisWithInfinite(PropertyValue prop) {
return "-1".equals(prop.getValue()) ? -1 : prop.asTimePeriod(TimeUnit.MILLISECONDS);
}
/**
* Shutdown pool, close all open connections.
* If a principal is authenticated with a KDC, that principal is logged out.
*
* If a @{@link LoginException} occurs while attempting to log out the @{@link org.apache.nifi.security.krb.KerberosUser},
* an attempt will still be made to shut down the pool and close open connections.
*
* @throws SQLException if there is an error while closing open connections
* @throws LoginException if there is an error during the principal log out, and will only be thrown if there was
* no exception while closing open connections
*/
@OnDisabled
public void shutdown() throws SQLException, LoginException {
try {
if (kerberosUser != null) {
kerberosUser.logout();
}
} finally {
validationResourceHolder.set(null);
foundHadoopDependencies = null;
kerberosUser = null;
ugi = null;
try {
if (dataSource != null) {
dataSource.close();
}
} finally {
dataSource = null;
}
}
}
@Override
public Connection getConnection() throws ProcessException {
try {
if (ugi != null) {
// Explicitly check the TGT and relogin if necessary with the KerberosUser instance. No synchronization
// is necessary in the client code, since AbstractKerberosUser's checkTGTAndRelogin method is synchronized.
getLogger().trace("getting UGI instance");
if (kerberosUser != null) {
// if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
getLogger().debug("kerberosUser is " + kerberosUser);
try {
getLogger().debug("checking TGT on kerberosUser " + kerberosUser);
kerberosUser.checkTGTAndRelogin();
} catch (LoginException e) {
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
}
} else {
getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
// no synchronization is needed for UserGroupInformation.checkTGTAndReloginFromKeytab; UGI handles the synchronization internally
ugi.checkTGTAndReloginFromKeytab();
}
try {
return ugi.doAs((PrivilegedExceptionAction<Connection>) () -> dataSource.getConnection());
} catch (UndeclaredThrowableException e) {
Throwable cause = e.getCause();
if (cause instanceof SQLException) {
throw (SQLException) cause;
} else {
throw e;
}
}
} else {
getLogger().info("Simple Authentication");
return dataSource.getConnection();
}
} catch (SQLException | IOException | InterruptedException e) {
getLogger().error("Error getting Connection: " + e.getMessage(), e);
throw new ProcessException(e);
}
}
@Override
public String toString() {
return "HadoopDBCPConnectionPool[id=" + getIdentifier() + "]";
}
/*
* Overridable by subclasses in the same package, mainly intended for testing purposes to allow verification without having to set environment variables.
*/
boolean isAllowExplicitKeytab() {
return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
}
BasicDataSource getDataSource() {
return dataSource;
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import org.apache.hadoop.conf.Configuration;
public class ValidationResources {
private final String configResources;
private final Configuration configuration;
public ValidationResources(String configResources, Configuration configuration) {
this.configResources = configResources;
this.configuration = configuration;
}
public String getConfigResources() {
return configResources;
}
public Configuration getConfiguration() {
return configuration;
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.dbcp.HadoopDBCPConnectionPool

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.kerberos.KerberosContext;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockKerberosContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
public class HadoopDBCPConnectionPoolTest {
private File krbConfFile;
private KerberosProperties kerberosProps;
private KerberosContext kerberosContext;
@Before
public void setup() {
krbConfFile = new File("src/test/resources/krb5.conf");
kerberosProps = new KerberosProperties(krbConfFile);
kerberosContext = new MockKerberosContext(krbConfFile);
}
@Test
public void testCustomValidateWhenAllowExplicitKeytab() throws InitializationException {
final Processor testProcessor = new TestProcessor();
final TestRunner runner = TestRunners.newTestRunner(testProcessor, kerberosContext);
// Configure minimum required properties..
final HadoopDBCPConnectionPool hadoopDBCPService = new TestableHadoopDBCPConnectionPool(true);
runner.addControllerService("hadoop-dbcp-service", hadoopDBCPService);
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DATABASE_URL, "jdbc:phoenix:zk-host1,zk-host2:2181:/hbase");
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVERNAME, "org.apache.phoenix.jdbc.PhoenixDriver");
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVER_LOCATION, "target");
// Security is not enabled yet since no conf files provided, so should be valid
runner.assertValid(hadoopDBCPService);
// Enable security, should be invalid until some form of kerberos credentials are provided
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site-security.xml");
runner.assertNotValid(hadoopDBCPService);
// Configure principal and keytab, should be valid
runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosPrincipal(), "nifi@EXAMPLE.COM");
runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosKeytab(), "src/test/resources/fake.keytab");
runner.assertValid(hadoopDBCPService);
// Configure password, should become invalid
runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosPassword(), "password");
runner.assertNotValid(hadoopDBCPService);
// Remove keytab property, should become valid
runner.removeProperty(hadoopDBCPService, kerberosProps.getKerberosKeytab());
runner.assertValid(hadoopDBCPService);
// Configure a KeberosCredentialService, should become invalid
final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService(
"nifi@EXAMPLE.COM", "src/test/resources/fake.keytab");
runner.addControllerService("kerb-credentials", kerberosCredentialsService);
runner.enableControllerService(kerberosCredentialsService);
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE, "kerb-credentials");
runner.assertNotValid(hadoopDBCPService);
// Remove password property, still invalid
runner.removeProperty(hadoopDBCPService, kerberosProps.getKerberosPassword());
runner.assertNotValid(hadoopDBCPService);
// Remove principal property, only using keytab service, should become valid
runner.removeProperty(hadoopDBCPService, kerberosProps.getKerberosPrincipal());
runner.assertValid(hadoopDBCPService);
}
@Test
public void testCustomValidateWhenNotAllowExplicitKeytab() throws InitializationException {
final Processor testProcessor = new TestProcessor();
final TestRunner runner = TestRunners.newTestRunner(testProcessor, kerberosContext);
// Configure minimum required properties..
final HadoopDBCPConnectionPool hadoopDBCPService = new TestableHadoopDBCPConnectionPool(false);
runner.addControllerService("hadoop-dbcp-service", hadoopDBCPService);
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DATABASE_URL, "jdbc:phoenix:zk-host1,zk-host2:2181:/hbase");
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVERNAME, "org.apache.phoenix.jdbc.PhoenixDriver");
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.DB_DRIVER_LOCATION, "target");
// Security is not enabled yet since no conf files provided, so should be valid
runner.assertValid(hadoopDBCPService);
// Enable security, should be invalid until some form of kerberos credentials are provided
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site-security.xml");
runner.assertNotValid(hadoopDBCPService);
// Configure principal and keytab, should be valid
runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosPrincipal(), "nifi@EXAMPLE.COM");
runner.assertNotValid(hadoopDBCPService);
}
private static final class TestableHadoopDBCPConnectionPool extends HadoopDBCPConnectionPool {
private final boolean allowExplicitKeytab;
public TestableHadoopDBCPConnectionPool(boolean allowExplicitKeytab) {
this.allowExplicitKeytab = allowExplicitKeytab;
}
@Override
boolean isAllowExplicitKeytab() {
return allowExplicitKeytab;
}
}
private class MockKerberosCredentialsService extends AbstractControllerService implements KerberosCredentialsService {
private String principal;
private String keytab;
public MockKerberosCredentialsService(String principal, String keytab) {
this.principal = principal;
this.keytab = keytab;
}
@Override
public String getKeytab() {
return keytab;
}
@Override
public String getPrincipal() {
return principal;
}
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.dbcp;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
public class TestProcessor extends AbstractProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> propDescs = new ArrayList<>();
propDescs.add(new PropertyDescriptor.Builder()
.name("DBCPService test processor")
.description("DBCPService test processor")
.identifiesControllerService(DBCPService.class)
.required(true)
.build());
return propDescs;
}
}

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hbase</value>
</property>
<property>
<name>hadoop.security.authentication</name>
<value>kerberos</value>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hbase</value>
</property>
</configuration>

View File

@ -0,0 +1,12 @@
[libdefaults]
default_realm = EXAMPLE.COM
[realms]
EXAMPLE.COM = {
kdc = kdc1.example.com
kdc = kdc2.example.com
admin_server = kdc1.example.com
}
[domain_realm]
.example.com = EXAMPLE.COM

View File

@ -0,0 +1,29 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services</artifactId>
<version>1.12.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-hadoop-dbcp-service-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-hadoop-dbcp-service</module>
<module>nifi-hadoop-dbcp-service-nar</module>
</modules>
</project>

View File

@ -49,5 +49,6 @@
<module>nifi-rules-engine-service-api</module>
<module>nifi-record-sink-api</module>
<module>nifi-record-sink-service-bundle</module>
<module>nifi-hadoop-dbcp-service-bundle</module>
</modules>
</project>