NIFI-4915 Adding HBase 2.x service bundle

Thise closes #3254

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Bryan Bende 2019-01-09 11:02:14 -05:00 committed by Mike Thomsen
parent fd3d69bc90
commit 410c9a4ecd
26 changed files with 3657 additions and 5 deletions

View File

@ -1,5 +1,5 @@
Apache NiFi
Copyright 2014-2018 The Apache Software Foundation
Copyright 2014-2019 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
@ -74,7 +74,7 @@ The following binary components are provided under the Apache Software License v
(ASLv2) Apache Commons Collections
The following NOTICE information applies:
Apache Commons Collections
Copyright 2001-2013 The Apache Software Foundation
Copyright 2001-2016 The Apache Software Foundation
(ASLv2) Apache Commons Compress
The following NOTICE information applies:
@ -87,6 +87,11 @@ The following binary components are provided under the Apache Software License v
"LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html)
(ASLv2) Apache Commons Crypto
The following NOTICE information applies:
Apache Commons Crypto
Copyright 2016-2016 The Apache Software Foundation
(ASLv2) Jettison
The following NOTICE information applies:
Copyright 2006 Envoi Solutions LLC
@ -173,7 +178,7 @@ The following binary components are provided under the Apache Software License v
(ASLv2) Apache Commons Logging
The following NOTICE information applies:
Apache Commons Logging
Copyright 2003-2014 The Apache Software Foundation
Copyright 2003-2016 The Apache Software Foundation
(ASLv2) Apache Commons VFS
The following NOTICE information applies:
@ -1079,8 +1084,10 @@ The following binary components are provided under the Apache Software License v
(ASLv2) HBase Common
The following NOTICE information applies:
This product includes portions of the Guava project v14, specifically
This product includes portions of the Guava project v14 and v21, specifically
'hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java'
'hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java'
'hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java'
Copyright (C) 2007 The Guava Authors
@ -1729,6 +1736,11 @@ The following binary components are provided under the Apache Software License v
Copyright 2018 simple-syslog-5424 authors.
(ASLv2) Audience Annotations
The following NOTICE information applies:
Apache Yetus
Copyright 2008-2018 The Apache Software Foundation
************************
Common Development and Distribution License 1.1
************************

View File

@ -433,6 +433,12 @@ language governing permissions and limitations under the License. -->
<version>1.9.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase_2-client-service-nar</artifactId>
<version>1.9.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-nar</artifactId>

View File

@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase_2-client-service-bundle</artifactId>
<version>1.9.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-hbase_2-client-service-nar</artifactId>
<version>1.9.0-SNAPSHOT</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>1.9.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase_2-client-service</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,357 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.
The binary distribution of this product bundles 'Jcodings' under an MIT style
license.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
The binary distribution of this product bundles 'Joni' under an MIT style
license.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
The binary distribution of this product bundles 'Google Protocol Buffers Java 2.5.0'
which is licensed under a BSD license.
This license applies to all parts of Protocol Buffers except the following:
- Atomicops support for generic gcc, located in
src/google/protobuf/stubs/atomicops_internals_generic_gcc.h.
This file is copyrighted by Red Hat Inc.
- Atomicops support for AIX/POWER, located in
src/google/protobuf/stubs/atomicops_internals_aix.h.
This file is copyrighted by Bloomberg Finance LP.
Copyright 2014, Google Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Code generated by the Protocol Buffer compiler is owned by the owner
of the input file used when generating it. This code is not
standalone and requires a support library to be linked with it. This
support library is itself covered by the above license.
The binary distribution of this product bundles 'Paranamer Core' which is available
under a BSD style license.
Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holders nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
THE POSSIBILITY OF SUCH DAMAGE.
The binary distribution of this product bundles 'JCraft Jsch' which is available
under a BSD style license.
Copyright (c) 2002-2014 Atsuhiko Yamanaka, JCraft,Inc.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
3. The names of the authors may not be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JCRAFT,
INC. OR ANY CONTRIBUTORS TO THIS SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,316 @@
nifi-hbase_2-client-service-nar
Copyright 2014-2019 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
(ASLv2) Apache Commons CLI
The following NOTICE information applies:
Apache Commons CLI
Copyright 2001-2009 The Apache Software Foundation
(ASLv2) Apache Curator
The following NOTICE information applies:
Curator Framework
Copyright 2011-2014 The Apache Software Foundation
Curator Client
Copyright 2011-2014 The Apache Software Foundation
Curator Recipes
Copyright 2011-2014 The Apache Software Foundation
(ASLv2) Apache Directory Server
The following NOTICE information applies:
ApacheDS Protocol Kerberos Codec
Copyright 2003-2013 The Apache Software Foundation
ApacheDS I18n
Copyright 2003-2013 The Apache Software Foundation
Apache Directory API ASN.1 API
Copyright 2003-2013 The Apache Software Foundation
Apache Directory LDAP API Utilities
Copyright 2003-2013 The Apache Software Foundation
(ASLv2) Apache Commons Math
The following NOTICE information applies:
Apache Commons Math
Copyright 2001-2012 The Apache Software Foundation
This product includes software developed by
The Apache Software Foundation (http://www.apache.org/).
===============================================================================
The BracketFinder (package org.apache.commons.math3.optimization.univariate)
and PowellOptimizer (package org.apache.commons.math3.optimization.general)
classes are based on the Python code in module "optimize.py" (version 0.5)
developed by Travis E. Oliphant for the SciPy library (http://www.scipy.org/)
Copyright © 2003-2009 SciPy Developers.
===============================================================================
The LinearConstraint, LinearObjectiveFunction, LinearOptimizer,
RelationShip, SimplexSolver and SimplexTableau classes in package
org.apache.commons.math3.optimization.linear include software developed by
Benjamin McCann (http://www.benmccann.com) and distributed with
the following copyright: Copyright 2009 Google Inc.
===============================================================================
This product includes software developed by the
University of Chicago, as Operator of Argonne National
Laboratory.
The LevenbergMarquardtOptimizer class in package
org.apache.commons.math3.optimization.general includes software
translated from the lmder, lmpar and qrsolv Fortran routines
from the Minpack package
Minpack Copyright Notice (1999) University of Chicago. All rights reserved
===============================================================================
The GraggBulirschStoerIntegrator class in package
org.apache.commons.math3.ode.nonstiff includes software translated
from the odex Fortran routine developed by E. Hairer and G. Wanner.
Original source copyright:
Copyright (c) 2004, Ernst Hairer
===============================================================================
The EigenDecompositionImpl class in package
org.apache.commons.math3.linear includes software translated
from some LAPACK Fortran routines. Original source copyright:
Copyright (c) 1992-2008 The University of Tennessee. All rights reserved.
===============================================================================
The MersenneTwister class in package org.apache.commons.math3.random
includes software translated from the 2002-01-26 version of
the Mersenne-Twister generator written in C by Makoto Matsumoto and Takuji
Nishimura. Original source copyright:
Copyright (C) 1997 - 2002, Makoto Matsumoto and Takuji Nishimura,
All rights reserved
===============================================================================
The LocalizedFormatsTest class in the unit tests is an adapted version of
the OrekitMessagesTest class from the orekit library distributed under the
terms of the Apache 2 licence. Original source copyright:
Copyright 2010 CS Systèmes d'Information
===============================================================================
The HermiteInterpolator class and its corresponding test have been imported from
the orekit library distributed under the terms of the Apache 2 licence. Original
source copyright:
Copyright 2010-2012 CS Systèmes d'Information
===============================================================================
The creation of the package "o.a.c.m.analysis.integration.gauss" was inspired
by an original code donated by Sébastien Brisard.
===============================================================================
(ASLv2) Apache Jakarta HttpClient
The following NOTICE information applies:
Apache Jakarta HttpClient
Copyright 1999-2007 The Apache Software Foundation
(ASLv2) Apache Commons Codec
The following NOTICE information applies:
Apache Commons Codec
Copyright 2002-2014 The Apache Software Foundation
src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
contains test data from http://aspell.net/test/orig/batch0.tab.
Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
===============================================================================
The content of package org.apache.commons.codec.language.bm has been translated
from the original php source code available at http://stevemorse.org/phoneticinfo.htm
with permission from the original authors.
Original source copyright:
Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
(ASLv2) Apache Commons IO
The following NOTICE information applies:
Apache Commons IO
Copyright 2002-2016 The Apache Software Foundation
(ASLv2) Apache Commons Net
The following NOTICE information applies:
Apache Commons Net
Copyright 2001-2013 The Apache Software Foundation
(ASLv2) Apache Commons Collections
The following NOTICE information applies:
Apache Commons Collections
Copyright 2001-2016 The Apache Software Foundation
(ASLv2) Apache Commons Crypto
The following NOTICE information applies:
Apache Commons Crypto
Copyright 2016-2016 The Apache Software Foundation
(ASLv2) Jettison
The following NOTICE information applies:
Copyright 2006 Envoi Solutions LLC
(ASLv2) Apache Commons Logging
The following NOTICE information applies:
Apache Commons Logging
Copyright 2003-2016 The Apache Software Foundation
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2011 The Apache Software Foundation
(ASLv2) Apache log4j
The following NOTICE information applies:
Apache log4j
Copyright 2007 The Apache Software Foundation
(ASLv2) Apache HttpComponents
The following NOTICE information applies:
Apache HttpClient
Copyright 1999-2015 The Apache Software Foundation
Apache HttpComponents HttpCore
Copyright 2005-2011 The Apache Software Foundation
(ASLv2) Apache Commons Configuration
The following NOTICE information applies:
Apache Commons Configuration
Copyright 2001-2008 The Apache Software Foundation
(ASLv2) Apache Jakarta Commons Digester
The following NOTICE information applies:
Apache Jakarta Commons Digester
Copyright 2001-2006 The Apache Software Foundation
(ASLv2) Apache Commons BeanUtils
The following NOTICE information applies:
Apache Commons BeanUtils
Copyright 2000-2008 The Apache Software Foundation
(ASLv2) Apache Avro
The following NOTICE information applies:
Apache Avro
Copyright 2009-2017 The Apache Software Foundation
(ASLv2) Snappy Java
The following NOTICE information applies:
This product includes software developed by Google
Snappy: http://code.google.com/p/snappy/ (New BSD License)
This product includes software developed by Apache
PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/
(Apache 2.0 license)
This library containd statically linked libstdc++. This inclusion is allowed by
"GCC RUntime Library Exception"
http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html
(ASLv2) ApacheDS
The following NOTICE information applies:
ApacheDS
Copyright 2003-2013 The Apache Software Foundation
(ASLv2) Apache ZooKeeper
The following NOTICE information applies:
Apache ZooKeeper
Copyright 2009-2012 The Apache Software Foundation
(ASLv2) Apache Commons Compress
The following NOTICE information applies:
Apache Commons Compress
Copyright 2002-2017 The Apache Software Foundation
The files in the package org.apache.commons.compress.archivers.sevenz
were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/),
which has been placed in the public domain:
"LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html)
(ASLv2) Apache Commons Daemon
The following NOTICE information applies:
Apache Commons Daemon
Copyright 1999-2013 The Apache Software Foundation
(ASLv2) The Netty Project
The following NOTICE information applies:
The Netty Project
Copyright 2011 The Netty Project
(ASLv2) Apache Xerces Java
The following NOTICE information applies:
Apache Xerces Java
Copyright 1999-2007 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Portions of this software were originally based on the following:
- software copyright (c) 1999, IBM Corporation., http://www.ibm.com.
- software copyright (c) 1999, Sun Microsystems., http://www.sun.com.
- voluntary contributions made by Paul Eng on behalf of the
Apache Software Foundation that were originally developed at iClick, Inc.,
software copyright (c) 1999.
(ASLv2) Google Guice
The following NOTICE information applies:
Google Guice - Core Library
Copyright 2006-2011 Google, Inc.
Google Guice - Extensions - Servlet
Copyright 2006-2011 Google, Inc.
(ASLv2) HBase Common
The following NOTICE information applies:
This product includes portions of the Guava project v14 and v21, specifically
'hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java'
'hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java'
'hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java'
Copyright (C) 2007 The Guava Authors
Licensed under the Apache License, Version 2.0
(ASLv2) Apache HTrace Core
The following NOTICE information applies:
Copyright 2016 The Apache Software Foundation
Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin
is a distributed tracing system that is Apache 2.0 Licensed.
Copyright 2012 Twitter, Inc.
(ASLv2) Jackson Core ASL
The following NOTICE information applies:
This product currently only contains code developed by authors
of specific components, as identified by the source code files;
if such notes are missing files have been created by
Tatu Saloranta.
For additional credits (generally to people who reported problems)
see CREDITS file.
(ASLv2) Jackson Mapper ASL
The following NOTICE information applies:
This product currently only contains code developed by authors
of specific components, as identified by the source code files;
if such notes are missing files have been created by
Tatu Saloranta.
For additional credits (generally to people who reported problems)
see CREDITS file.
(ASLv2) Audience Annotations
The following NOTICE information applies:
Apache Yetus
Copyright 2008-2018 The Apache Software Foundation
(ASLv2) Jetty
The following NOTICE information applies:
Jetty Web Container
Copyright 1995-2017 Mort Bay Consulting Pty Ltd.

View File

@ -0,0 +1,108 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase_2-client-service-bundle</artifactId>
<version>1.9.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-hbase_2-client-service</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase-client-service-api</artifactId>
<version>1.9.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-utils</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
<version>1.3.9-1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.9.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,251 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.reporting.InitializationException;
import java.nio.charset.StandardCharsets;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.processor.util.StandardValidators;
import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
@Tags({"distributed", "cache", "state", "map", "cluster","hbase"})
@SeeAlso(classNames = {"org.apache.nifi.hbase.HBase_2_ClientService"})
@CapabilityDescription("Provides the ability to use an HBase table as a cache, in place of a DistributedMapCache."
+ " Uses a HBase_2_ClientService controller to communicate with HBase.")
public class HBase_2_ClientMapCacheService extends AbstractControllerService implements DistributedMapCacheClient {
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
.description("Specifies the HBase Client Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
public static final PropertyDescriptor HBASE_CACHE_TABLE_NAME = new PropertyDescriptor.Builder()
.name("HBase Cache Table Name")
.description("Name of the table on HBase to use for the cache.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor HBASE_COLUMN_FAMILY = new PropertyDescriptor.Builder()
.name("HBase Column Family")
.description("Name of the column family on HBase to use for the cache.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("f")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor HBASE_COLUMN_QUALIFIER = new PropertyDescriptor.Builder()
.name("HBase Column Qualifier")
.description("Name of the column qualifier on HBase to use for the cache")
.defaultValue("q")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(HBASE_CACHE_TABLE_NAME);
descriptors.add(AUTHORIZATIONS);
descriptors.add(HBASE_CLIENT_SERVICE);
descriptors.add(HBASE_COLUMN_FAMILY);
descriptors.add(HBASE_COLUMN_QUALIFIER);
return descriptors;
}
// Other threads may call @OnEnabled so these are marked volatile to ensure other class methods read the updated value
private volatile String hBaseCacheTableName;
private volatile HBaseClientService hBaseClientService;
private volatile String hBaseColumnFamily;
private volatile byte[] hBaseColumnFamilyBytes;
private volatile String hBaseColumnQualifier;
private volatile byte[] hBaseColumnQualifierBytes;
private List<String> authorizations;
@OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException{
hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
hBaseCacheTableName = context.getProperty(HBASE_CACHE_TABLE_NAME).evaluateAttributeExpressions().getValue();
hBaseColumnFamily = context.getProperty(HBASE_COLUMN_FAMILY).evaluateAttributeExpressions().getValue();
hBaseColumnQualifier = context.getProperty(HBASE_COLUMN_QUALIFIER).evaluateAttributeExpressions().getValue();
hBaseColumnFamilyBytes = hBaseColumnFamily.getBytes(StandardCharsets.UTF_8);
hBaseColumnQualifierBytes = hBaseColumnQualifier.getBytes(StandardCharsets.UTF_8);
authorizations = VisibilityLabelUtils.getAuthorizations(context);
}
private <T> byte[] serialize(final T value, final Serializer<T> serializer) throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializer.serialize(value, baos);
return baos.toByteArray();
}
private <T> T deserialize(final byte[] value, final Deserializer<T> deserializer) throws IOException {
return deserializer.deserialize(value);
}
@Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
final byte[] rowIdBytes = serialize(key, keySerializer);
final byte[] valueBytes = serialize(value, valueSerializer);
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes);
return hBaseClientService.checkAndPut(hBaseCacheTableName, rowIdBytes, hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, null, putColumn);
}
@Override
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
List<PutColumn> putColumns = new ArrayList<PutColumn>(1);
final byte[] rowIdBytes = serialize(key, keySerializer);
final byte[] valueBytes = serialize(value, valueSerializer);
final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes);
putColumns.add(putColumn);
hBaseClientService.put(hBaseCacheTableName, rowIdBytes, putColumns);
}
@Override
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
final byte[] rowIdBytes = serialize(key, keySerializer);
final HBaseRowHandler handler = new HBaseRowHandler();
final List<Column> columnsList = new ArrayList<Column>(0);
hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler);
return (handler.numRows() > 0);
}
/**
* Note that the implementation of getAndPutIfAbsent is not atomic.
* The putIfAbsent is atomic, but a getAndPutIfAbsent does a get and then a putIfAbsent.
* If there is an existing value and it is updated in betweern the two steps, then the existing (unmodified) value will be returned.
* If the existing value was deleted between the two steps, getAndPutIfAbsent will correctly return null.
* This should not generally be an issue with cache processors such as DetectDuplicate.
*
*/
@Override
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer) throws IOException {
// Between the get and the putIfAbsent, the value could be deleted or updated.
// Logic below takes care of the deleted case but not the updated case.
// This is probably fine since DistributedMapCache and DetectDuplicate expect to receive the original cache value
// Could possibly be fixed by implementing AtomicDistributedMapCache (Map Cache protocol version 2)
final V got = get(key, keySerializer, valueDeserializer);
final boolean wasAbsent = putIfAbsent(key, value, keySerializer, valueSerializer);
if (! wasAbsent) return got;
else return null;
}
@Override
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
final byte[] rowIdBytes = serialize(key, keySerializer);
final HBaseRowHandler handler = new HBaseRowHandler();
final List<Column> columnsList = new ArrayList<Column>(0);
hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler);
if (handler.numRows() > 1) {
throw new IOException("Found multiple rows in HBase for key");
} else if(handler.numRows() == 1) {
return deserialize( handler.getLastResultBytes(), valueDeserializer);
} else {
return null;
}
}
@Override
public <K> boolean remove(final K key, final Serializer<K> keySerializer) throws IOException {
final boolean contains = containsKey(key, keySerializer);
if (contains) {
final byte[] rowIdBytes = serialize(key, keySerializer);
hBaseClientService.delete(hBaseCacheTableName, rowIdBytes);
}
return contains;
}
@Override
public long removeByPattern(String regex) throws IOException {
throw new IOException("HBase removeByPattern is not implemented");
}
@Override
public void close() throws IOException {
}
@Override
protected void finalize() throws Throwable {
}
private class HBaseRowHandler implements ResultHandler {
private int numRows = 0;
private byte[] lastResultBytes;
@Override
public void handle(byte[] row, ResultCell[] resultCells) {
numRows += 1;
for( final ResultCell resultCell : resultCells ){
lastResultBytes = Arrays.copyOfRange(resultCell.getValueArray(), resultCell.getValueOffset(), resultCell.getValueLength() + resultCell.getValueOffset());
}
}
public int numRows() {
return numRows;
}
public byte[] getLastResultBytes() {
return lastResultBytes;
}
}
}

View File

@ -0,0 +1,842 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.security.visibility.Authorizations;
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.hbase.validate.ConfigFilesValidator;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@RequiresInstanceClassLoading
@Tags({ "hbase", "client"})
@CapabilityDescription("Implementation of HBaseClientService using the HBase 1.1.x client. Although this service was originally built with the 1.1.2 " +
"client and has 1_1_2 in it's name, the client library has since been upgraded to 1.1.13 to leverage bug fixes. This service can be configured " +
"by providing a comma-separated list of configuration files, or by specifying values for the other properties. If configuration files " +
"are provided, they will be loaded first, and the values of the additional properties will override the values from " +
"the configuration files. In addition, any user defined properties on the processor will also be passed to the HBase " +
"configuration.")
@DynamicProperty(name="The name of an HBase configuration property.", value="The value of the given HBase configuration property.",
description="These properties will be set on the HBase configuration after loading any provided configuration files.")
public class HBase_2_ClientService extends AbstractControllerService implements HBaseClientService {
private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
private static final Logger logger = LoggerFactory.getLogger(HBase_2_ClientService.class);
static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosCredentialsService.class)
.required(false)
.build();
static final PropertyDescriptor HADOOP_CONF_FILES = new PropertyDescriptor.Builder()
.name("Hadoop Configuration Files")
.description("Comma-separated list of Hadoop Configuration files," +
" such as hbase-site.xml and core-site.xml for kerberos, " +
"including full paths to the files.")
.addValidator(new ConfigFilesValidator())
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder()
.name("ZooKeeper Quorum")
.description("Comma-separated list of ZooKeeper hosts for HBase. Required if Hadoop Configuration Files are not provided.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor ZOOKEEPER_CLIENT_PORT = new PropertyDescriptor.Builder()
.name("ZooKeeper Client Port")
.description("The port on which ZooKeeper is accepting client connections. Required if Hadoop Configuration Files are not provided.")
.addValidator(StandardValidators.PORT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor ZOOKEEPER_ZNODE_PARENT = new PropertyDescriptor.Builder()
.name("ZooKeeper ZNode Parent")
.description("The ZooKeeper ZNode Parent value for HBase (example: /hbase). Required if Hadoop Configuration Files are not provided.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor HBASE_CLIENT_RETRIES = new PropertyDescriptor.Builder()
.name("HBase Client Retries")
.description("The number of times the HBase client will retry connecting. Required if Hadoop Configuration Files are not provided.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor PHOENIX_CLIENT_JAR_LOCATION = new PropertyDescriptor.Builder()
.name("Phoenix Client JAR Location")
.description("The full path to the Phoenix client JAR. Required if Phoenix is installed on top of HBase.")
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dynamicallyModifiesClasspath(true)
.build();
static final String HBASE_CONF_ZK_QUORUM = "hbase.zookeeper.quorum";
static final String HBASE_CONF_ZK_PORT = "hbase.zookeeper.property.clientPort";
static final String HBASE_CONF_ZNODE_PARENT = "zookeeper.znode.parent";
static final String HBASE_CONF_CLIENT_RETRIES = "hbase.client.retries.number";
private volatile Connection connection;
private volatile UserGroupInformation ugi;
private volatile String masterAddress;
private List<PropertyDescriptor> properties;
private KerberosProperties kerberosProperties;
private volatile File kerberosConfigFile = null;
// Holder of cached Configuration information so validation does not reload the same config over and over
private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
protected Connection getConnection() {
return connection;
}
protected void setConnection(Connection connection) {
this.connection = connection;
}
@Override
protected void init(ControllerServiceInitializationContext config) throws InitializationException {
kerberosConfigFile = config.getKerberosConfigurationFile();
kerberosProperties = getKerberosProperties(kerberosConfigFile);
List<PropertyDescriptor> props = new ArrayList<>();
props.add(HADOOP_CONF_FILES);
props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(kerberosProperties.getKerberosPrincipal());
props.add(kerberosProperties.getKerberosKeytab());
props.add(ZOOKEEPER_QUORUM);
props.add(ZOOKEEPER_CLIENT_PORT);
props.add(ZOOKEEPER_ZNODE_PARENT);
props.add(HBASE_CLIENT_RETRIES);
props.add(PHOENIX_CLIENT_JAR_LOCATION);
props.addAll(getAdditionalProperties());
this.properties = Collections.unmodifiableList(props);
}
protected List<PropertyDescriptor> getAdditionalProperties() {
return new ArrayList<>();
}
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
return new KerberosProperties(kerberosConfigFile);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value for '" + propertyDescriptorName + "' in the HBase configuration.")
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.dynamic(true)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
boolean confFileProvided = validationContext.getProperty(HADOOP_CONF_FILES).isSet();
boolean zkQuorumProvided = validationContext.getProperty(ZOOKEEPER_QUORUM).isSet();
boolean zkPortProvided = validationContext.getProperty(ZOOKEEPER_CLIENT_PORT).isSet();
boolean znodeParentProvided = validationContext.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet();
boolean retriesProvided = validationContext.getProperty(HBASE_CLIENT_RETRIES).isSet();
final String explicitPrincipal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String resolvedPrincipal;
final String resolvedKeytab;
if (credentialsService == null) {
resolvedPrincipal = explicitPrincipal;
resolvedKeytab = explicitKeytab;
} else {
resolvedPrincipal = credentialsService.getPrincipal();
resolvedKeytab = credentialsService.getKeytab();
}
final List<ValidationResult> problems = new ArrayList<>();
if (!confFileProvided && (!zkQuorumProvided || !zkPortProvided || !znodeParentProvided || !retriesProvided)) {
problems.add(new ValidationResult.Builder()
.valid(false)
.subject(this.getClass().getSimpleName())
.explanation("ZooKeeper Quorum, ZooKeeper Client Port, ZooKeeper ZNode Parent, and HBase Client Retries are required " +
"when Hadoop Configuration Files are not provided.")
.build());
}
if (confFileProvided) {
final String configFiles = validationContext.getProperty(HADOOP_CONF_FILES).evaluateAttributeExpressions().getValue();
ValidationResources resources = validationResourceHolder.get();
// if no resources in the holder, or if the holder has different resources loaded,
// then load the Configuration and set the new resources in the holder
if (resources == null || !configFiles.equals(resources.getConfigResources())) {
getLogger().debug("Reloading validation resources");
resources = new ValidationResources(configFiles, getConfigurationFromFiles(configFiles));
validationResourceHolder.set(resources);
}
final Configuration hbaseConfig = resources.getConfiguration();
problems.addAll(KerberosProperties.validatePrincipalAndKeytab(getClass().getSimpleName(), hbaseConfig, resolvedPrincipal, resolvedKeytab, getLogger()));
}
if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {
problems.add(new ValidationResult.Builder()
.subject("Kerberos Credentials")
.valid(false)
.explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab")
.build());
}
final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB);
if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) {
problems.add(new ValidationResult.Builder()
.subject("Kerberos Credentials")
.valid(false)
.explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. "
+ "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.")
.build());
}
return problems;
}
/**
* As of Apache NiFi 1.5.0, due to changes made to
* {@link SecurityUtil#loginKerberos(Configuration, String, String)}, which is used by this
* class to authenticate a principal with Kerberos, HBase controller services no longer
* attempt relogins explicitly. For more information, please read the documentation for
* {@link SecurityUtil#loginKerberos(Configuration, String, String)}.
* <p/>
* In previous versions of NiFi, a {@link org.apache.nifi.hadoop.KerberosTicketRenewer} was started
* when the HBase controller service was enabled. The use of a separate thread to explicitly relogin could cause
* race conditions with the implicit relogin attempts made by hadoop/HBase code on a thread that references the same
* {@link UserGroupInformation} instance. One of these threads could leave the
* {@link javax.security.auth.Subject} in {@link UserGroupInformation} to be cleared or in an unexpected state
* while the other thread is attempting to use the {@link javax.security.auth.Subject}, resulting in failed
* authentication attempts that would leave the HBase controller service in an unrecoverable state.
*
* @see SecurityUtil#loginKerberos(Configuration, String, String)
*/
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
this.connection = createConnection(context);
// connection check
if (this.connection != null) {
final Admin admin = this.connection.getAdmin();
if (admin != null) {
admin.listTableNames();
final ClusterStatus clusterStatus = admin.getClusterStatus();
if (clusterStatus != null) {
final ServerName master = clusterStatus.getMaster();
if (master != null) {
masterAddress = master.getHostAndPort();
} else {
masterAddress = null;
}
}
}
}
}
protected Connection createConnection(final ConfigurationContext context) throws IOException, InterruptedException {
final String configFiles = context.getProperty(HADOOP_CONF_FILES).evaluateAttributeExpressions().getValue();
final Configuration hbaseConfig = getConfigurationFromFiles(configFiles);
// override with any properties that are provided
if (context.getProperty(ZOOKEEPER_QUORUM).isSet()) {
hbaseConfig.set(HBASE_CONF_ZK_QUORUM, context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue());
}
if (context.getProperty(ZOOKEEPER_CLIENT_PORT).isSet()) {
hbaseConfig.set(HBASE_CONF_ZK_PORT, context.getProperty(ZOOKEEPER_CLIENT_PORT).evaluateAttributeExpressions().getValue());
}
if (context.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet()) {
hbaseConfig.set(HBASE_CONF_ZNODE_PARENT, context.getProperty(ZOOKEEPER_ZNODE_PARENT).evaluateAttributeExpressions().getValue());
}
if (context.getProperty(HBASE_CLIENT_RETRIES).isSet()) {
hbaseConfig.set(HBASE_CONF_CLIENT_RETRIES, context.getProperty(HBASE_CLIENT_RETRIES).evaluateAttributeExpressions().getValue());
}
// add any dynamic properties to the HBase configuration
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamic()) {
hbaseConfig.set(descriptor.getName(), entry.getValue());
}
}
if (SecurityUtil.isSecurityEnabled(hbaseConfig)) {
String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
// If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab.
// The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null.
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
if (credentialsService != null) {
principal = credentialsService.getPrincipal();
keyTab = credentialsService.getKeytab();
}
getLogger().info("HBase Security Enabled, logging in as principal {} with keytab {}", new Object[] {principal, keyTab});
ugi = SecurityUtil.loginKerberos(hbaseConfig, principal, keyTab);
getLogger().info("Successfully logged in as principal {} with keytab {}", new Object[] {principal, keyTab});
return ugi.doAs(new PrivilegedExceptionAction<Connection>() {
@Override
public Connection run() throws Exception {
return ConnectionFactory.createConnection(hbaseConfig);
}
});
} else {
getLogger().info("Simple Authentication");
return ConnectionFactory.createConnection(hbaseConfig);
}
}
protected Configuration getConfigurationFromFiles(final String configFiles) {
final Configuration hbaseConfig = HBaseConfiguration.create();
if (StringUtils.isNotBlank(configFiles)) {
for (final String configFile : configFiles.split(",")) {
hbaseConfig.addResource(new Path(configFile.trim()));
}
}
return hbaseConfig;
}
@OnDisabled
public void shutdown() {
if (connection != null) {
try {
connection.close();
} catch (final IOException ioe) {
getLogger().warn("Failed to close connection to HBase due to {}", new Object[]{ioe});
}
}
}
private List<Put> buildPuts(byte[] rowKey, List<PutColumn> columns) {
List<Put> retVal = new ArrayList<>();
try {
Put put = null;
for (final PutColumn column : columns) {
if (put == null || (put.getCellVisibility() == null && column.getVisibility() != null) || ( put.getCellVisibility() != null
&& !put.getCellVisibility().getExpression().equals(column.getVisibility())
)) {
put = new Put(rowKey);
if (column.getVisibility() != null) {
put.setCellVisibility(new CellVisibility(column.getVisibility()));
}
retVal.add(put);
}
if (column.getTimestamp() != null) {
put.addColumn(
column.getColumnFamily(),
column.getColumnQualifier(),
column.getTimestamp(),
column.getBuffer());
} else {
put.addColumn(
column.getColumnFamily(),
column.getColumnQualifier(),
column.getBuffer());
}
}
} catch (DeserializationException de) {
getLogger().error("Error writing cell visibility statement.", de);
throw new RuntimeException(de);
}
return retVal;
}
@Override
public void put(final String tableName, final Collection<PutFlowFile> puts) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
// Create one Put per row....
final Map<String, List<PutColumn>> sorted = new HashMap<>();
final List<Put> newPuts = new ArrayList<>();
for (final PutFlowFile putFlowFile : puts) {
final String rowKeyString = new String(putFlowFile.getRow(), StandardCharsets.UTF_8);
List<PutColumn> columns = sorted.get(rowKeyString);
if (columns == null) {
columns = new ArrayList<>();
sorted.put(rowKeyString, columns);
}
columns.addAll(putFlowFile.getColumns());
}
for (final Map.Entry<String, List<PutColumn>> entry : sorted.entrySet()) {
newPuts.addAll(buildPuts(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue()));
}
table.put(newPuts);
}
}
@Override
public void put(final String tableName, final byte[] rowId, final Collection<PutColumn> columns) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
table.put(buildPuts(rowId, new ArrayList(columns)));
}
}
@Override
public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
Put put = new Put(rowId);
put.addColumn(
column.getColumnFamily(),
column.getColumnQualifier(),
column.getBuffer());
return table.checkAndPut(rowId, family, qualifier, value, put);
}
}
@Override
public void delete(final String tableName, final byte[] rowId) throws IOException {
delete(tableName, rowId, null);
}
@Override
public void delete(String tableName, byte[] rowId, String visibilityLabel) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
Delete delete = new Delete(rowId);
if (!StringUtils.isEmpty(visibilityLabel)) {
delete.setCellVisibility(new CellVisibility(visibilityLabel));
}
table.delete(delete);
}
}
@Override
public void delete(String tableName, List<byte[]> rowIds) throws IOException {
delete(tableName, rowIds);
}
@Override
public void deleteCells(String tableName, List<DeleteRequest> deletes) throws IOException {
List<Delete> deleteRequests = new ArrayList<>();
for (int index = 0; index < deletes.size(); index++) {
DeleteRequest req = deletes.get(index);
Delete delete = new Delete(req.getRowId())
.addColumn(req.getColumnFamily(), req.getColumnQualifier());
if (!StringUtils.isEmpty(req.getVisibilityLabel())) {
delete.setCellVisibility(new CellVisibility(req.getVisibilityLabel()));
}
deleteRequests.add(delete);
}
batchDelete(tableName, deleteRequests);
}
@Override
public void delete(String tableName, List<byte[]> rowIds, String visibilityLabel) throws IOException {
List<Delete> deletes = new ArrayList<>();
for (int index = 0; index < rowIds.size(); index++) {
Delete delete = new Delete(rowIds.get(index));
if (!StringUtils.isBlank(visibilityLabel)) {
delete.setCellVisibility(new CellVisibility(visibilityLabel));
}
deletes.add(delete);
}
batchDelete(tableName, deletes);
}
private void batchDelete(String tableName, List<Delete> deletes) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
table.delete(deletes);
}
}
@Override
public void scan(final String tableName, final Collection<Column> columns, final String filterExpression, final long minTime, final ResultHandler handler)
throws IOException {
scan(tableName, columns, filterExpression, minTime, null, handler);
}
@Override
public void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, List<String> visibilityLabels, ResultHandler handler) throws IOException {
Filter filter = null;
if (!StringUtils.isBlank(filterExpression)) {
ParseFilter parseFilter = new ParseFilter();
filter = parseFilter.parseFilterString(filterExpression);
}
try (final Table table = connection.getTable(TableName.valueOf(tableName));
final ResultScanner scanner = getResults(table, columns, filter, minTime, visibilityLabels)) {
for (final Result result : scanner) {
final byte[] rowKey = result.getRow();
final Cell[] cells = result.rawCells();
if (cells == null) {
continue;
}
// convert HBase cells to NiFi cells
final ResultCell[] resultCells = new ResultCell[cells.length];
for (int i=0; i < cells.length; i++) {
final Cell cell = cells[i];
final ResultCell resultCell = getResultCell(cell);
resultCells[i] = resultCell;
}
// delegate to the handler
handler.handle(rowKey, resultCells);
}
}
}
@Override
public void scan(final String tableName, final byte[] startRow, final byte[] endRow, final Collection<Column> columns, List<String> authorizations, final ResultHandler handler)
throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName));
final ResultScanner scanner = getResults(table, startRow, endRow, columns, authorizations)) {
for (final Result result : scanner) {
final byte[] rowKey = result.getRow();
final Cell[] cells = result.rawCells();
if (cells == null) {
continue;
}
// convert HBase cells to NiFi cells
final ResultCell[] resultCells = new ResultCell[cells.length];
for (int i=0; i < cells.length; i++) {
final Cell cell = cells[i];
final ResultCell resultCell = getResultCell(cell);
resultCells[i] = resultCell;
}
// delegate to the handler
handler.handle(rowKey, resultCells);
}
}
}
@Override
public void scan(final String tableName, final String startRow, final String endRow, String filterExpression,
final Long timerangeMin, final Long timerangeMax, final Integer limitRows, final Boolean isReversed,
final Collection<Column> columns, List<String> visibilityLabels, final ResultHandler handler) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName));
final ResultScanner scanner = getResults(table, startRow, endRow, filterExpression, timerangeMin,
timerangeMax, limitRows, isReversed, columns, visibilityLabels)) {
int cnt = 0;
final int lim = limitRows != null ? limitRows : 0;
for (final Result result : scanner) {
if (lim > 0 && ++cnt > lim){
break;
}
final byte[] rowKey = result.getRow();
final Cell[] cells = result.rawCells();
if (cells == null) {
continue;
}
// convert HBase cells to NiFi cells
final ResultCell[] resultCells = new ResultCell[cells.length];
for (int i = 0; i < cells.length; i++) {
final Cell cell = cells[i];
final ResultCell resultCell = getResultCell(cell);
resultCells[i] = resultCell;
}
// delegate to the handler
handler.handle(rowKey, resultCells);
}
}
}
//
protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax,
final Integer limitRows, final Boolean isReversed, final Collection<Column> columns, List<String> authorizations) throws IOException {
final Scan scan = new Scan();
if (!StringUtils.isBlank(startRow)){
scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
}
if (!StringUtils.isBlank(endRow)){
scan.setStopRow( endRow.getBytes(StandardCharsets.UTF_8));
}
if (authorizations != null && authorizations.size() > 0) {
scan.setAuthorizations(new Authorizations(authorizations));
}
Filter filter = null;
if (columns != null) {
for (Column col : columns) {
if (col.getQualifier() == null) {
scan.addFamily(col.getFamily());
} else {
scan.addColumn(col.getFamily(), col.getQualifier());
}
}
}
if (!StringUtils.isBlank(filterExpression)) {
ParseFilter parseFilter = new ParseFilter();
filter = parseFilter.parseFilterString(filterExpression);
}
if (filter != null){
scan.setFilter(filter);
}
if (timerangeMin != null && timerangeMax != null){
scan.setTimeRange(timerangeMin, timerangeMax);
}
// ->>> reserved for HBase v 2 or later
//if (limitRows != null && limitRows > 0){
// scan.setLimit(limitRows)
//}
if (isReversed != null){
scan.setReversed(isReversed);
}
return table.getScanner(scan);
}
// protected and extracted into separate method for testing
protected ResultScanner getResults(final Table table, final byte[] startRow, final byte[] endRow, final Collection<Column> columns, List<String> authorizations) throws IOException {
final Scan scan = new Scan();
scan.setStartRow(startRow);
scan.setStopRow(endRow);
if (authorizations != null && authorizations.size() > 0) {
scan.setAuthorizations(new Authorizations(authorizations));
}
if (columns != null && columns.size() > 0) {
for (Column col : columns) {
if (col.getQualifier() == null) {
scan.addFamily(col.getFamily());
} else {
scan.addColumn(col.getFamily(), col.getQualifier());
}
}
}
return table.getScanner(scan);
}
// protected and extracted into separate method for testing
protected ResultScanner getResults(final Table table, final Collection<Column> columns, final Filter filter, final long minTime, List<String> authorizations) throws IOException {
// Create a new scan. We will set the min timerange as the latest timestamp that
// we have seen so far. The minimum timestamp is inclusive, so we will get duplicates.
// We will record any cells that have the latest timestamp, so that when we scan again,
// we know to throw away those duplicates.
final Scan scan = new Scan();
scan.setTimeRange(minTime, Long.MAX_VALUE);
if (authorizations != null && authorizations.size() > 0) {
scan.setAuthorizations(new Authorizations(authorizations));
}
if (filter != null) {
scan.setFilter(filter);
}
if (columns != null) {
for (Column col : columns) {
if (col.getQualifier() == null) {
scan.addFamily(col.getFamily());
} else {
scan.addColumn(col.getFamily(), col.getQualifier());
}
}
}
return table.getScanner(scan);
}
private ResultCell getResultCell(Cell cell) {
final ResultCell resultCell = new ResultCell();
resultCell.setRowArray(cell.getRowArray());
resultCell.setRowOffset(cell.getRowOffset());
resultCell.setRowLength(cell.getRowLength());
resultCell.setFamilyArray(cell.getFamilyArray());
resultCell.setFamilyOffset(cell.getFamilyOffset());
resultCell.setFamilyLength(cell.getFamilyLength());
resultCell.setQualifierArray(cell.getQualifierArray());
resultCell.setQualifierOffset(cell.getQualifierOffset());
resultCell.setQualifierLength(cell.getQualifierLength());
resultCell.setTimestamp(cell.getTimestamp());
resultCell.setTypeByte(cell.getTypeByte());
resultCell.setSequenceId(cell.getSequenceId());
resultCell.setValueArray(cell.getValueArray());
resultCell.setValueOffset(cell.getValueOffset());
resultCell.setValueLength(cell.getValueLength());
resultCell.setTagsArray(cell.getTagsArray());
resultCell.setTagsOffset(cell.getTagsOffset());
resultCell.setTagsLength(cell.getTagsLength());
return resultCell;
}
static protected class ValidationResources {
private final String configResources;
private final Configuration configuration;
public ValidationResources(String configResources, Configuration configuration) {
this.configResources = configResources;
this.configuration = configuration;
}
public String getConfigResources() {
return configResources;
}
public Configuration getConfiguration() {
return configuration;
}
}
@Override
public byte[] toBytes(boolean b) {
return Bytes.toBytes(b);
}
@Override
public byte[] toBytes(float f) {
return Bytes.toBytes(f);
}
@Override
public byte[] toBytes(int i) {
return Bytes.toBytes(i);
}
@Override
public byte[] toBytes(long l) {
return Bytes.toBytes(l);
}
@Override
public byte[] toBytes(double d) {
return Bytes.toBytes(d);
}
@Override
public byte[] toBytes(String s) {
return Bytes.toBytes(s);
}
@Override
public byte[] toBytesBinary(String s) {
return Bytes.toBytesBinary(s);
}
@Override
public String toTransitUri(String tableName, String rowKey) {
if (connection == null) {
logger.warn("Connection has not been established, could not create a transit URI. Returning null.");
return null;
}
final String transitUriMasterAddress = StringUtils.isEmpty(masterAddress) ? "unknown" : masterAddress;
return "hbase://" + transitUriMasterAddress + "/" + tableName + (StringUtils.isEmpty(rowKey) ? "" : "/" + rowKey);
}
}

View File

@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
@Tags({"hbase", "record", "lookup", "service"})
@CapabilityDescription("A lookup service that retrieves one or more columns from HBase and returns them as a record. The lookup coordinates " +
"must contain 'rowKey' which will be the HBase row id.")
public class HBase_2_RecordLookupService extends AbstractControllerService implements LookupService<Record> {
static final String ROW_KEY_KEY = "rowKey";
private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(ROW_KEY_KEY)));
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("hbase-client-service")
.displayName("HBase Client Service")
.description("Specifies the HBase Client Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("hb-lu-table-name")
.displayName("Table Name")
.description("The name of the table where look ups will be run.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor RETURN_COLUMNS = new PropertyDescriptor.Builder()
.name("hb-lu-return-cols")
.displayName("Columns")
.description("A comma-separated list of \\\"<colFamily>:<colQualifier>\\\" pairs to return when scanning. " +
"To return all columns for a given family, leave off the qualifier such as \\\"<colFamily1>,<colFamily2>\\\".")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("hb-lu-charset")
.displayName("Character Set")
.description("Specifies the character set used to decode bytes retrieved from HBase.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
static final List<PropertyDescriptor> PROPERTIES;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(HBASE_CLIENT_SERVICE);
props.add(TABLE_NAME);
props.add(AUTHORIZATIONS);
props.add(RETURN_COLUMNS);
props.add(CHARSET);
PROPERTIES = Collections.unmodifiableList(props);
}
private String tableName;
private List<Column> columns;
private Charset charset;
private HBaseClientService hBaseClientService;
private List<String> authorizations;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
if (coordinates.get(ROW_KEY_KEY) == null) {
return Optional.empty();
}
final String rowKey = coordinates.get(ROW_KEY_KEY).toString();
if (StringUtils.isBlank(rowKey)) {
return Optional.empty();
}
final byte[] rowKeyBytes = rowKey.getBytes(StandardCharsets.UTF_8);
try {
final Map<String, Object> values = new HashMap<>();
hBaseClientService.scan(tableName, rowKeyBytes, rowKeyBytes, columns, authorizations, (byte[] row, ResultCell[] resultCells) -> {
for (final ResultCell cell : resultCells) {
final byte[] qualifier = Arrays.copyOfRange(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierOffset() + cell.getQualifierLength());
final byte[] value = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength());
values.put(new String(qualifier, charset), new String(value, charset));
}
});
if (values.size() > 0) {
final List<RecordField> fields = new ArrayList<>();
for (String key : values.keySet()) {
fields.add(new RecordField(key, RecordFieldType.STRING.getDataType()));
}
final RecordSchema schema = new SimpleRecordSchema(fields);
return Optional.ofNullable(new MapRecord(schema, values));
} else {
return Optional.empty();
}
} catch (IOException e) {
getLogger().error("Error occurred loading {}", new Object[] { coordinates.get("rowKey") }, e);
throw new LookupFailureException(e);
}
}
@Override
public Class<?> getValueType() {
return Record.class;
}
@Override
public Set<String> getRequiredKeys() {
return REQUIRED_KEYS;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, InterruptedException {
this.hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
this.tableName = context.getProperty(TABLE_NAME).getValue();
this.columns = getColumns(context.getProperty(RETURN_COLUMNS).getValue());
this.charset = Charset.forName(context.getProperty(CHARSET).getValue());
this.authorizations = VisibilityLabelUtils.getAuthorizations(context);
}
@OnDisabled
public void onDisabled() {
this.hBaseClientService = null;
this.tableName = null;
this.columns = null;
this.charset = null;
}
private List<Column> getColumns(final String columnsValue) {
final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
final List<Column> columnsList = new ArrayList<>();
for (final String column : columns) {
if (column.contains(":")) {
final String[] parts = column.trim().split(":");
final byte[] cf = parts[0].getBytes(StandardCharsets.UTF_8);
final byte[] cq = parts[1].getBytes(StandardCharsets.UTF_8);
columnsList.add(new Column(cf, cq));
} else {
final byte[] cf = column.trim().getBytes(StandardCharsets.UTF_8);
columnsList.add(new Column(cf, null));
}
}
return columnsList;
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
class VisibilityLabelUtils {
static final PropertyDescriptor AUTHORIZATIONS = new PropertyDescriptor.Builder()
.name("hb-lu-authorizations")
.displayName("Authorizations")
.description("The list of authorization tokens to be used with cell visibility if it is enabled. These will be used to " +
"override the default authorization list for the user accessing HBase.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static List<String> getAuthorizations(ConfigurationContext context) {
List<String> tokens = new ArrayList<>();
String authorizationString = context.getProperty(AUTHORIZATIONS).isSet()
? context.getProperty(AUTHORIZATIONS).getValue()
: "";
if (!StringUtils.isEmpty(authorizationString)) {
tokens = Arrays.asList(authorizationString.split(",[\\s]*"));
}
return tokens;
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.validate;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.util.StandardValidators;
public class ConfigFilesValidator implements Validator {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
final String[] filenames = value.split(",");
for (final String filename : filenames) {
final ValidationResult result = StandardValidators.FILE_EXISTS_VALIDATOR.validate(subject, filename.trim(), context);
if (!result.isValid()) {
return result;
}
}
return new ValidationResult.Builder().subject(subject).input(value).valid(true).build();
}
}

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.hbase.HBase_2_ClientService
org.apache.nifi.hbase.HBase_2_ClientMapCacheService
org.apache.nifi.hbase.HBase_2_RecordLookupService

View File

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.scan.Column;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.mockito.Mockito.when;
/**
* Override methods to create a mock service that can return staged data
*/
public class MockHBaseClientService extends HBase_2_ClientService {
private Table table;
private String family;
private List<Result> results = new ArrayList<>();
private KerberosProperties kerberosProperties;
public MockHBaseClientService(final Table table, final String family, final KerberosProperties kerberosProperties) {
this.table = table;
this.family = family;
this.kerberosProperties = kerberosProperties;
}
@Override
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
return kerberosProperties;
}
protected void setKerberosProperties(KerberosProperties properties) {
this.kerberosProperties = properties;
}
public void addResult(final String rowKey, final Map<String, String> cells, final long timestamp) {
final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8);
final Cell[] cellArray = new Cell[cells.size()];
int i = 0;
for (final Map.Entry<String, String> cellEntry : cells.entrySet()) {
final Cell cell = Mockito.mock(Cell.class);
when(cell.getRowArray()).thenReturn(rowArray);
when(cell.getRowOffset()).thenReturn(0);
when(cell.getRowLength()).thenReturn((short) rowArray.length);
final String cellValue = cellEntry.getValue();
final byte[] valueArray = cellValue.getBytes(StandardCharsets.UTF_8);
when(cell.getValueArray()).thenReturn(valueArray);
when(cell.getValueOffset()).thenReturn(0);
when(cell.getValueLength()).thenReturn(valueArray.length);
final byte[] familyArray = family.getBytes(StandardCharsets.UTF_8);
when(cell.getFamilyArray()).thenReturn(familyArray);
when(cell.getFamilyOffset()).thenReturn(0);
when(cell.getFamilyLength()).thenReturn((byte) familyArray.length);
final String qualifier = cellEntry.getKey();
final byte[] qualifierArray = qualifier.getBytes(StandardCharsets.UTF_8);
when(cell.getQualifierArray()).thenReturn(qualifierArray);
when(cell.getQualifierOffset()).thenReturn(0);
when(cell.getQualifierLength()).thenReturn(qualifierArray.length);
when(cell.getTimestamp()).thenReturn(timestamp);
cellArray[i++] = cell;
}
final Result result = Mockito.mock(Result.class);
when(result.getRow()).thenReturn(rowArray);
when(result.rawCells()).thenReturn(cellArray);
results.add(result);
}
@Override
public void put(final String tableName, final byte[] rowId, final Collection<PutColumn> columns) throws IOException {
Put put = new Put(rowId);
Map<String, String> map = new HashMap<String, String>();
for (final PutColumn column : columns) {
put.addColumn(
column.getColumnFamily(),
column.getColumnQualifier(),
column.getBuffer());
map.put(new String(column.getColumnQualifier()), new String(column.getBuffer()));
}
table.put(put);
addResult(new String(rowId), map, 1);
}
@Override
public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException {
for (Result result : results) {
if (Arrays.equals(result.getRow(), rowId)) {
Cell[] cellArray = result.rawCells();
for (Cell cell : cellArray) {
if (Arrays.equals(cell.getFamilyArray(), family) && Arrays.equals(cell.getQualifierArray(), qualifier)) {
if (value == null || Arrays.equals(cell.getValueArray(), value)) {
return false;
}
}
}
}
}
final List<PutColumn> putColumns = new ArrayList<PutColumn>();
putColumns.add(column);
put(tableName, rowId, putColumns);
return true;
}
protected ResultScanner getResults(Table table, byte[] startRow, byte[] endRow, Collection<Column> columns, List<String> labels) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
return scanner;
}
@Override
protected ResultScanner getResults(Table table, Collection<Column> columns, Filter filter, long minTime, List<String> labels) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
return scanner;
}
protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax,
final Integer limitRows, final Boolean isReversed, final Collection<Column> columns) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
return scanner;
}
@Override
protected Connection createConnection(ConfigurationContext context) throws IOException {
Connection connection = Mockito.mock(Connection.class);
Mockito.when(connection.getTable(table.getName())).thenReturn(table);
return connection;
}
}

View File

@ -0,0 +1,313 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestHBase_2_ClientMapCacheService {
private KerberosProperties kerberosPropsWithFile;
private KerberosProperties kerberosPropsWithoutFile;
private Serializer<String> stringSerializer = new StringSerializer();
private Deserializer<String> stringDeserializer = new StringDeserializer();
@Before
public void setup() {
// needed for calls to UserGroupInformation.setConfiguration() to work when passing in
// config with Kerberos authentication enabled
System.setProperty("java.security.krb5.realm", "nifi.com");
System.setProperty("java.security.krb5.kdc", "nifi.kdc");
kerberosPropsWithFile = new KerberosProperties(new File("src/test/resources/krb5.conf"));
kerberosPropsWithoutFile = new KerberosProperties(null);
}
private final String tableName = "nifi";
private final String columnFamily = "family1";
private final String columnQualifier = "qualifier1";
@Test
public void testPut() throws InitializationException, IOException {
final String row = "row1";
final String content = "content1";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService);
runner.assertValid(cacheService);
// try to put a single cell
final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
.asControllerService(DistributedMapCacheClient.class);
hBaseCacheService.put( row, content, stringSerializer, stringSerializer);
// verify only one call to put was made
ArgumentCaptor<Put> capture = ArgumentCaptor.forClass(Put.class);
verify(table, times(1)).put(capture.capture());
verifyPut(row, columnFamily, columnQualifier, content, capture.getValue());
}
@Test
public void testGet() throws InitializationException, IOException {
final String row = "row1";
final String content = "content1";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService);
runner.assertValid(cacheService);
final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
.asControllerService(DistributedMapCacheClient.class);
hBaseCacheService.put( row, content, stringSerializer, stringSerializer);
final String result = hBaseCacheService.get(row, stringSerializer, stringDeserializer);
assertEquals( content, result);
}
@Test
public void testContainsKey() throws InitializationException, IOException {
final String row = "row1";
final String content = "content1";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService);
runner.assertValid(cacheService);
final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
.asControllerService(DistributedMapCacheClient.class);
assertFalse( hBaseCacheService.containsKey(row , stringSerializer ) );
hBaseCacheService.put( row, content, stringSerializer, stringSerializer);
assertTrue( hBaseCacheService.containsKey(row, stringSerializer) );
}
@Test
public void testPutIfAbsent() throws InitializationException, IOException {
final String row = "row1";
final String content = "content1";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService);
runner.assertValid(cacheService);
final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
.asControllerService(DistributedMapCacheClient.class);
assertTrue( hBaseCacheService.putIfAbsent( row, content, stringSerializer, stringSerializer));
// verify only one call to put was made
ArgumentCaptor<Put> capture = ArgumentCaptor.forClass(Put.class);
verify(table, times(1)).put(capture.capture());
verifyPut(row, columnFamily, columnQualifier, content, capture.getValue());
assertFalse( hBaseCacheService.putIfAbsent( row, content, stringSerializer, stringSerializer));
verify(table, times(1)).put(capture.capture());
}
@Test
public void testGetAndPutIfAbsent() throws InitializationException, IOException {
final String row = "row1";
final String content = "content1";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
final DistributedMapCacheClient cacheService = configureHBaseCacheService(runner, hBaseClientService);
runner.assertValid(cacheService);
final DistributedMapCacheClient hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
.asControllerService(DistributedMapCacheClient.class);
assertNull( hBaseCacheService.getAndPutIfAbsent( row, content, stringSerializer, stringSerializer, stringDeserializer));
// verify only one call to put was made
ArgumentCaptor<Put> capture = ArgumentCaptor.forClass(Put.class);
verify(table, times(1)).put(capture.capture());
verifyPut(row, columnFamily, columnQualifier, content, capture.getValue());
final String result = hBaseCacheService.getAndPutIfAbsent( row, content, stringSerializer, stringSerializer, stringDeserializer);
verify(table, times(1)).put(capture.capture());
assertEquals( result, content);
}
private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException {
final MockHBaseClientService service = new MockHBaseClientService(table, "family1", kerberosPropsWithFile);
runner.addControllerService("hbaseClient", service);
runner.setProperty(service, HBase_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
runner.enableControllerService(service);
runner.setProperty(TestProcessor.HBASE_CLIENT_SERVICE, "hbaseClient");
return service;
}
private DistributedMapCacheClient configureHBaseCacheService(final TestRunner runner, final HBaseClientService service) throws InitializationException {
final HBase_2_ClientMapCacheService cacheService = new HBase_2_ClientMapCacheService();
runner.addControllerService("hbaseCache", cacheService);
runner.setProperty(cacheService, HBase_2_ClientMapCacheService.HBASE_CLIENT_SERVICE, "hbaseClient");
runner.setProperty(cacheService, HBase_2_ClientMapCacheService.HBASE_CACHE_TABLE_NAME, tableName);
runner.setProperty(cacheService, HBase_2_ClientMapCacheService.HBASE_COLUMN_FAMILY, columnFamily);
runner.setProperty(cacheService, HBase_2_ClientMapCacheService.HBASE_COLUMN_QUALIFIER, columnQualifier);
runner.enableControllerService(cacheService);
runner.setProperty(TestProcessor.HBASE_CACHE_SERVICE,"hbaseCache");
return cacheService;
}
private void verifyResultCell(final ResultCell result, final String cf, final String cq, final String val) {
final String colFamily = new String(result.getFamilyArray(), result.getFamilyOffset(), result.getFamilyLength());
assertEquals(cf, colFamily);
final String colQualifier = new String(result.getQualifierArray(), result.getQualifierOffset(), result.getQualifierLength());
assertEquals(cq, colQualifier);
final String value = new String(result.getValueArray(), result.getValueOffset(), result.getValueLength());
assertEquals(val, value);
}
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, Put put) {
assertEquals(row, new String(put.getRow()));
NavigableMap<byte [], List<Cell>> familyCells = put.getFamilyCellMap();
assertEquals(1, familyCells.size());
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(columnFamily, new String(entry.getKey()));
assertEquals(1, entry.getValue().size());
Cell cell = entry.getValue().get(0);
assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
private static class StringSerializer implements Serializer<String> {
@Override
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
out.write(value.getBytes(StandardCharsets.UTF_8));
}
}
private static class StringDeserializer implements Deserializer<String> {
@Override
public String deserialize(byte[] input) throws DeserializationException, IOException{
return new String(input);
}
}
}

View File

@ -0,0 +1,450 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestHBase_2_ClientService {
static final String COL_FAM = "nifi1";
private KerberosProperties kerberosPropsWithFile;
private KerberosProperties kerberosPropsWithoutFile;
@Before
public void setup() {
// needed for calls to UserGroupInformation.setConfiguration() to work when passing in
// config with Kerberos authentication enabled
System.setProperty("java.security.krb5.realm", "nifi.com");
System.setProperty("java.security.krb5.kdc", "nifi.kdc");
kerberosPropsWithFile = new KerberosProperties(new File("src/test/resources/krb5.conf"));
kerberosPropsWithoutFile = new KerberosProperties(null);
}
@Test
public void testCustomValidate() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
final String tableName = "nifi";
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// no conf file or zk properties so should be invalid
MockHBaseClientService service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
runner.addControllerService("hbaseClientService", service);
runner.assertNotValid(service);
runner.removeControllerService(service);
runner.setVariable("hadoop-conf-files", "src/test/resources/hbase-site.xml");
runner.setVariable("zk-quorum", "localhost");
runner.setVariable("zk-client-port", "2181");
runner.setVariable("zk-znode", "/hbase");
// conf file with no zk properties should be valid
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
runner.addControllerService("hbaseClientService", service);
runner.setProperty(service, HBase_2_ClientService.HADOOP_CONF_FILES, "${hadoop-conf-files}");
runner.enableControllerService(service);
runner.assertValid(service);
runner.removeControllerService(service);
// only quorum and no conf file should be invalid
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
runner.addControllerService("hbaseClientService", service);
runner.setProperty(service, HBase_2_ClientService.ZOOKEEPER_QUORUM, "${zk-quorum}");
runner.assertNotValid(service);
runner.removeControllerService(service);
// quorum and port, no znode, no conf file, should be invalid
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
runner.addControllerService("hbaseClientService", service);
runner.setProperty(service, HBase_2_ClientService.ZOOKEEPER_QUORUM, "${zk-quorum}");
runner.setProperty(service, HBase_2_ClientService.ZOOKEEPER_CLIENT_PORT, "${zk-client-port}");
runner.assertNotValid(service);
runner.removeControllerService(service);
// quorum, port, and znode, no conf file, should be valid
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
runner.addControllerService("hbaseClientService", service);
runner.setProperty(service, HBase_2_ClientService.ZOOKEEPER_QUORUM, "${zk-quorum}");
runner.setProperty(service, HBase_2_ClientService.ZOOKEEPER_CLIENT_PORT, "${zk-client-port}");
runner.setProperty(service, HBase_2_ClientService.ZOOKEEPER_ZNODE_PARENT, "${zk-znode}");
runner.enableControllerService(service);
runner.assertValid(service);
runner.removeControllerService(service);
// quorum and port with conf file should be valid
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
runner.addControllerService("hbaseClientService", service);
runner.setProperty(service, HBase_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
runner.setProperty(service, HBase_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
runner.setProperty(service, HBase_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181");
runner.enableControllerService(service);
runner.assertValid(service);
runner.removeControllerService(service);
// Kerberos - principal with non-set keytab and only hbase-site-security - valid because we need core-site-security to turn on security
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
runner.addControllerService("hbaseClientService", service);
runner.setProperty(service, HBase_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site-security.xml");
runner.setProperty(service, kerberosPropsWithFile.getKerberosPrincipal(), "test@REALM");
runner.enableControllerService(service);
runner.assertValid(service);
// Kerberos - principal with non-set keytab and both config files
runner.disableControllerService(service);
runner.setProperty(service, HBase_2_ClientService.HADOOP_CONF_FILES,
"src/test/resources/hbase-site-security.xml, src/test/resources/core-site-security.xml");
runner.assertNotValid(service);
// Kerberos - add valid options
runner.setProperty(service, kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/fake.keytab");
runner.setProperty(service, kerberosPropsWithFile.getKerberosPrincipal(), "test@REALM");
runner.enableControllerService(service);
runner.assertValid(service);
// Kerberos - add invalid non-existent keytab file
runner.disableControllerService(service);
runner.setProperty(service, kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/missing.keytab");
runner.assertNotValid(service);
// Kerberos - add invalid principal
runner.setProperty(service, kerberosPropsWithFile.getKerberosKeytab(), "src/test/resources/fake.keytab");
runner.setProperty(service, kerberosPropsWithFile.getKerberosPrincipal(), "");
runner.assertNotValid(service);
// Kerberos - valid props but the KerberosProperties has a null Kerberos config file so be invalid
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithoutFile);
runner.addControllerService("hbaseClientService", service);
runner.setProperty(service, HBase_2_ClientService.HADOOP_CONF_FILES,
"src/test/resources/hbase-site-security.xml, src/test/resources/core-site-security.xml");
runner.setProperty(service, kerberosPropsWithoutFile.getKerberosKeytab(), "src/test/resources/fake.keytab");
runner.setProperty(service, kerberosPropsWithoutFile.getKerberosPrincipal(), "test@REALM");
runner.assertNotValid(service);
}
@Test
public void testSinglePut() throws InitializationException, IOException {
final String tableName = "nifi";
final String row = "row1";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final String content = "content1";
final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), columnQualifier.getBytes(StandardCharsets.UTF_8),
content.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile = new PutFlowFile(tableName, row.getBytes(StandardCharsets.UTF_8), columns, null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final HBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// try to put a single cell
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
hBaseClientService.put(tableName, Arrays.asList(putFlowFile));
// verify only one call to put was made
ArgumentCaptor<List> capture = ArgumentCaptor.forClass(List.class);
verify(table, times(1)).put(capture.capture());
// verify only one put was in the list of puts
final List<Put> puts = capture.getValue();
assertEquals(1, puts.size());
verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
}
@Test
public void testMultiplePutsSameRow() throws IOException, InitializationException {
final String tableName = "nifi";
final String row = "row1";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final String content1 = "content1";
final String content2 = "content2";
final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
columnQualifier.getBytes(StandardCharsets.UTF_8),
content1.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row.getBytes(StandardCharsets.UTF_8), columns1, null);
final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
columnQualifier.getBytes(StandardCharsets.UTF_8),
content2.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row.getBytes(StandardCharsets.UTF_8), columns2, null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final HBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// try to put a multiple cells for the same row
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
hBaseClientService.put(tableName, Arrays.asList(putFlowFile1, putFlowFile2));
// verify put was only called once
ArgumentCaptor<List> capture = ArgumentCaptor.forClass(List.class);
verify(table, times(1)).put(capture.capture());
// verify there was only one put in the list of puts
final List<Put> puts = capture.getValue();
assertEquals(1, puts.size());
// verify two cells were added to this one put operation
final NavigableMap<byte[], List<Cell>> familyCells = puts.get(0).getFamilyCellMap();
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(2, entry.getValue().size());
}
@Test
public void testMultiplePutsDifferentRow() throws IOException, InitializationException {
final String tableName = "nifi";
final String row1 = "row1";
final String row2 = "row2";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final String content1 = "content1";
final String content2 = "content2";
final Collection<PutColumn> columns1 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
columnQualifier.getBytes(StandardCharsets.UTF_8),
content1.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row1.getBytes(StandardCharsets.UTF_8), columns1, null);
final Collection<PutColumn> columns2 = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
columnQualifier.getBytes(StandardCharsets.UTF_8),
content2.getBytes(StandardCharsets.UTF_8)));
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row2.getBytes(StandardCharsets.UTF_8), columns2, null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final HBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// try to put a multiple cells with different rows
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
hBaseClientService.put(tableName, Arrays.asList(putFlowFile1, putFlowFile2));
// verify put was only called once
ArgumentCaptor<List> capture = ArgumentCaptor.forClass(List.class);
verify(table, times(1)).put(capture.capture());
// verify there were two puts in the list
final List<Put> puts = capture.getValue();
assertEquals(2, puts.size());
}
@Test
public void testScan() throws InitializationException, IOException {
final String tableName = "nifi";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// stage some results in the mock service...
final long now = System.currentTimeMillis();
final Map<String, String> cells = new HashMap<>();
cells.put("greeting", "hello");
cells.put("name", "nifi");
service.addResult("row0", cells, now - 2);
service.addResult("row1", cells, now - 1);
service.addResult("row2", cells, now - 1);
service.addResult("row3", cells, now);
// perform a scan and verify the four rows were returned
final CollectingResultHandler handler = new CollectingResultHandler();
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
hBaseClientService.scan(tableName, new ArrayList<Column>(), null, now, handler);
assertEquals(4, handler.results.size());
// get row0 using the row id and verify it has 2 cells
final ResultCell[] results = handler.results.get("row0");
assertNotNull(results);
assertEquals(2, results.length);
verifyResultCell(results[0], COL_FAM, "greeting", "hello");
verifyResultCell(results[1], COL_FAM, "name", "nifi");
}
@Test
public void testScanWithValidFilter() throws InitializationException, IOException {
final String tableName = "nifi";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// perform a scan and verify the four rows were returned
final CollectingResultHandler handler = new CollectingResultHandler();
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
// make sure we parse the filter expression without throwing an exception
final String filter = "PrefixFilter ('Row') AND PageFilter (1) AND FirstKeyOnlyFilter ()";
hBaseClientService.scan(tableName, new ArrayList<Column>(), filter, System.currentTimeMillis(), handler);
}
@Test(expected = IllegalArgumentException.class)
public void testScanWithInvalidFilter() throws InitializationException, IOException {
final String tableName = "nifi";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// perform a scan and verify the four rows were returned
final CollectingResultHandler handler = new CollectingResultHandler();
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
// this should throw IllegalArgumentException
final String filter = "this is not a filter";
hBaseClientService.scan(tableName, new ArrayList<Column>(), filter, System.currentTimeMillis(), handler);
}
private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException {
final MockHBaseClientService service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
runner.addControllerService("hbaseClient", service);
runner.setProperty(service, HBase_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
runner.enableControllerService(service);
runner.setProperty(TestProcessor.HBASE_CLIENT_SERVICE, "hbaseClient");
return service;
}
private void verifyResultCell(final ResultCell result, final String cf, final String cq, final String val) {
final String colFamily = new String(result.getFamilyArray(), result.getFamilyOffset(), result.getFamilyLength());
assertEquals(cf, colFamily);
final String colQualifier = new String(result.getQualifierArray(), result.getQualifierOffset(), result.getQualifierLength());
assertEquals(cq, colQualifier);
final String value = new String(result.getValueArray(), result.getValueOffset(), result.getValueLength());
assertEquals(val, value);
}
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, Put put) {
assertEquals(row, new String(put.getRow()));
NavigableMap<byte [], List<Cell>> familyCells = put.getFamilyCellMap();
assertEquals(1, familyCells.size());
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(columnFamily, new String(entry.getKey()));
assertEquals(1, entry.getValue().size());
Cell cell = entry.getValue().get(0);
assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
// handler that saves results for verification
private static final class CollectingResultHandler implements ResultHandler {
Map<String,ResultCell[]> results = new LinkedHashMap<>();
@Override
public void handle(byte[] row, ResultCell[] resultCells) {
final String rowStr = new String(row, StandardCharsets.UTF_8);
results.put(rowStr, resultCells);
}
}
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.mockito.Mockito.when;
public class TestHBase_2_RecordLookupService {
static final String TABLE_NAME = "guids";
static final String ROW = "row1";
static final String COLS = "cf1:cq1,cf2:cq2";
private TestRunner runner;
private HBase_2_RecordLookupService lookupService;
private MockHBaseClientService clientService;
private TestRecordLookupProcessor testLookupProcessor;
@Before
public void before() throws Exception {
testLookupProcessor = new TestRecordLookupProcessor();
runner = TestRunners.newTestRunner(testLookupProcessor);
// setup mock HBaseClientService
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(TABLE_NAME));
final KerberosProperties kerberosProperties = new KerberosProperties(new File("src/test/resources/krb5.conf"));
clientService = new MockHBaseClientService(table, "family", kerberosProperties);
runner.addControllerService("clientService", clientService);
runner.setProperty(clientService, HBase_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml");
runner.enableControllerService(clientService);
// setup HBase LookupService
lookupService = new HBase_2_RecordLookupService();
runner.addControllerService("lookupService", lookupService);
runner.setProperty(lookupService, HBase_2_RecordLookupService.HBASE_CLIENT_SERVICE, "clientService");
runner.setProperty(lookupService, HBase_2_RecordLookupService.TABLE_NAME, TABLE_NAME);
runner.enableControllerService(lookupService);
// setup test processor
runner.setProperty(TestRecordLookupProcessor.HBASE_LOOKUP_SERVICE, "lookupService");
runner.setProperty(TestRecordLookupProcessor.HBASE_ROW, ROW);
}
@Test
public void testSuccessfulLookupAllColumns() {
// setup some staged data in the mock client service
final Map<String,String> cells = new HashMap<>();
cells.put("cq1", "v1");
cells.put("cq2", "v2");
clientService.addResult("row1", cells, System.currentTimeMillis());
// run the processor
runner.enqueue("trigger flow file");
runner.run();
runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_SUCCESS);
final List<Record> records = testLookupProcessor.getLookedupRecords();
Assert.assertNotNull(records);
Assert.assertEquals(1, records.size());
final Record record = records.get(0);
Assert.assertEquals("v1", record.getAsString("cq1"));
Assert.assertEquals("v2", record.getAsString("cq2"));
}
@Test
public void testLookupWithNoResults() {
// run the processor
runner.enqueue("trigger flow file");
runner.run();
runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_FAILURE);
final List<Record> records = testLookupProcessor.getLookedupRecords();
Assert.assertNotNull(records);
Assert.assertEquals(0, records.size());
}
@Test
public void testLookupWhenMissingRowKeyCoordinate() {
runner.removeProperty(TestRecordLookupProcessor.HBASE_ROW);
// run the processor
runner.enqueue("trigger flow file");
runner.run();
runner.assertAllFlowFilesTransferred(TestRecordLookupProcessor.REL_FAILURE);
final List<Record> records = testLookupProcessor.getLookedupRecords();
Assert.assertNotNull(records);
Assert.assertEquals(0, records.size());
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.hbase.HBaseClientService;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
public class TestProcessor extends AbstractProcessor {
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
.description("HBaseClientService")
.identifiesControllerService(HBaseClientService.class)
.required(true)
.build();
static final PropertyDescriptor HBASE_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Cache Service")
.description("HBaseCacheService")
.identifiesControllerService(DistributedMapCacheClient.class)
.required(true)
.build();
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> propDescs = new ArrayList<>();
propDescs.add(HBASE_CLIENT_SERVICE);
propDescs.add(HBASE_CACHE_SERVICE);
return propDescs;
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.record.Record;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
public class TestRecordLookupProcessor extends AbstractProcessor {
static final PropertyDescriptor HBASE_LOOKUP_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Lookup Service")
.description("HBaseLookupService")
.identifiesControllerService(LookupService.class)
.required(true)
.build();
static final PropertyDescriptor HBASE_ROW = new PropertyDescriptor.Builder()
.name("HBase Row Id")
.description("The Row Id to Lookup.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All success FlowFiles are routed to this relationship")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("All failed FlowFiles are routed to this relationship")
.build();
private List<Record> lookedupRecords = new ArrayList<>();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> propDescs = new ArrayList<>();
propDescs.add(HBASE_LOOKUP_SERVICE);
propDescs.add(HBASE_ROW);
return propDescs;
}
@Override
public Set<Relationship> getRelationships() {
Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
return relationships;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String rowKey = context.getProperty(HBASE_ROW).getValue();
final Map<String,Object> coordinates = new HashMap<>();
coordinates.put(HBase_2_RecordLookupService.ROW_KEY_KEY, rowKey);
final LookupService<Record> lookupService = context.getProperty(HBASE_LOOKUP_SERVICE).asControllerService(LookupService.class);
try {
final Optional<Record> record = lookupService.lookup(coordinates);
if (record.isPresent()) {
lookedupRecords.add(record.get());
session.transfer(flowFile, REL_SUCCESS);
} else {
session.transfer(flowFile, REL_FAILURE);
}
} catch (LookupFailureException e) {
session.transfer(flowFile, REL_FAILURE);
}
}
public List<Record> getLookedupRecords() {
return new ArrayList<>(lookedupRecords);
}
public void clearLookedupRecords() {
this.lookedupRecords.clear();
}
}

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hbase</value>
</property>
<property>
<name>hadoop.security.authentication</name>
<value>kerberos</value>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hbase</value>
</property>
</configuration>

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hbase</value>
</property>
<property>
<name>hbase.security.authentication</name>
<value>kerberos</value>
</property>
<property>
<name>hbase.security.authorization</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hbase</value>
</property>
</configuration>

View File

@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services</artifactId>
<version>1.9.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase_2-client-service-bundle</artifactId>
<version>1.9.0-SNAPSHOT</version>
<packaging>pom</packaging>
<properties>
<hbase.version>2.1.1</hbase.version>
</properties>
<modules>
<module>nifi-hbase_2-client-service</module>
<module>nifi-hbase_2-client-service-nar</module>
</modules>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes combine.children="append">
<exclude>src/test/resources/fake.keytab</exclude>
<exclude>src/test/resources/krb5.conf</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
<dependencyManagement>
<dependencies>
<dependency>
<!-- Explicitly force Netty to 3.6.9 due to CVE-2014-0193 -->
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.6.9.Final</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -37,6 +37,7 @@
<module>nifi-dbcp-service-bundle</module>
<module>nifi-hbase-client-service-api</module>
<module>nifi-hbase_1_1_2-client-service-bundle</module>
<module>nifi-hbase_2-client-service-bundle</module>
<module>nifi-schema-registry-service-api</module>
<module>nifi-record-serialization-service-api</module>
<module>nifi-record-serialization-services-bundle</module>