diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 7caa9c2d03..82b9a862f4 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -882,20 +882,6 @@ language governing permissions and limitations under the License. -->
-
- include-elasticsearch-5-bundle
-
- false
-
-
-
- org.apache.nifi
- nifi-elasticsearch-5-nar
- 1.16.0-SNAPSHOT
- nar
-
-
-
include-graph
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-nar/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-nar/pom.xml
deleted file mode 100644
index 3b5b29ef7e..0000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-nar/pom.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-
-
-
- 4.0.0
-
- nifi-elasticsearch-bundle
- org.apache.nifi
- 1.16.0-SNAPSHOT
-
-
- org.apache.nifi
- nifi-elasticsearch-5-nar
- nar
-
- true
- true
- 6.2.1
-
-
-
-
- org.apache.nifi
- nifi-standard-services-api-nar
- 1.16.0-SNAPSHOT
- nar
-
-
- org.apache.nifi
- nifi-elasticsearch-5-processors
-
-
-
-
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-nar/src/main/resources/META-INF/LICENSE
deleted file mode 100644
index ee59a5d3d4..0000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-nar/src/main/resources/META-INF/LICENSE
+++ /dev/null
@@ -1,285 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright [yyyy] [name of copyright owner]
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
-APACHE NIFI SUBCOMPONENTS:
-
-The Apache NiFi project contains subcomponents with separate copyright
-notices and license terms. Your use of the source code for the these
-subcomponents is subject to the terms and conditions of the following
-licenses.
-
-The binary distribution of this product bundles 'HdrHistogram' which is available under a 2-Clause BSD style license:
-
- Copyright (c) 2012, 2013, 2014 Gil Tene
- Copyright (c) 2014 Michael Barker
- Copyright (c) 2014 Matt Warren
- All rights reserved.
-
- Redistribution and use in source and binary forms, with or without
- modification, are permitted provided that the following conditions are met:
-
- 1. Redistributions of source code must retain the above copyright notice,
- this list of conditions and the following disclaimer.
-
- 2. Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
-
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
- THE POSSIBILITY OF SUCH DAMAGE.
-
-The binary distribution of this product bundles 'Bouncy Castle JDK 1.5 Provider'
- under an MIT style license.
-
- Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org)
-
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"), to deal
- in the Software without restriction, including without limitation the rights
- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
-
- The above copyright notice and this permission notice shall be included in
- all copies or substantial portions of the Software.
-
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- THE SOFTWARE.
-
-
-The binary distribution of this product bundles 'JOpt Simple'
- under an MIT style license.
- Copyright (c) 2004-2015 Paul R. Holser, Jr.
-
- Permission is hereby granted, free of charge, to any person obtaining
- a copy of this software and associated documentation files (the
- "Software"), to deal in the Software without restriction, including
- without limitation the rights to use, copy, modify, merge, publish,
- distribute, sublicense, and/or sell copies of the Software, and to
- permit persons to whom the Software is furnished to do so, subject to
- the following conditions:
-
- The above copyright notice and this permission notice shall be
- included in all copies or substantial portions of the Software.
-
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
- MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
- LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
- OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
- WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-nar/src/main/resources/META-INF/NOTICE
deleted file mode 100644
index 8931a19f45..0000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-nar/src/main/resources/META-INF/NOTICE
+++ /dev/null
@@ -1,411 +0,0 @@
-nifi-elasticsearch-5-nar
-Copyright 2015-2020 The Apache Software Foundation
-
-This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
-
-******************
-Apache Software License v2
-******************
-
-The following binary components are provided under the Apache Software License v2
-
- (ASLv2) Elasticsearch
- The following NOTICE information applies:
- Elasticsearch
- Copyright 2009-2015 Elasticsearch
-
- (ASLv2) Apache Commons IO
- The following NOTICE information applies:
- Apache Commons IO
- Copyright 2002-2016 The Apache Software Foundation
-
- (ASLv2) Apache Lucene
- The following NOTICE information applies:
- Apache Lucene
- Copyright 2014 The Apache Software Foundation
-
- Includes software from other Apache Software Foundation projects,
- including, but not limited to:
- - Apache Ant
- - Apache Jakarta Regexp
- - Apache Commons
- - Apache Xerces
-
- ICU4J, (under analysis/icu) is licensed under an MIT styles license
- and Copyright (c) 1995-2008 International Business Machines Corporation and others
-
- Some data files (under analysis/icu/src/data) are derived from Unicode data such
- as the Unicode Character Database. See http://unicode.org/copyright.html for more
- details.
-
- Brics Automaton (under core/src/java/org/apache/lucene/util/automaton) is
- BSD-licensed, created by Anders Møller. See http://www.brics.dk/automaton/
-
- The levenshtein automata tables (under core/src/java/org/apache/lucene/util/automaton) were
- automatically generated with the moman/finenight FSA library, created by
- Jean-Philippe Barrette-LaPierre. This library is available under an MIT license,
- see http://sites.google.com/site/rrettesite/moman and
- http://bitbucket.org/jpbarrette/moman/overview/
-
- The class org.apache.lucene.util.WeakIdentityMap was derived from
- the Apache CXF project and is Apache License 2.0.
-
- The Google Code Prettify is Apache License 2.0.
- See http://code.google.com/p/google-code-prettify/
-
- JUnit (junit-4.10) is licensed under the Common Public License v. 1.0
- See http://junit.sourceforge.net/cpl-v10.html
-
- This product includes code (JaspellTernarySearchTrie) from Java Spelling Checkin
- g Package (jaspell): http://jaspell.sourceforge.net/
- License: The BSD License (http://www.opensource.org/licenses/bsd-license.php)
-
- The snowball stemmers in
- analysis/common/src/java/net/sf/snowball
- were developed by Martin Porter and Richard Boulton.
- The snowball stopword lists in
- analysis/common/src/resources/org/apache/lucene/analysis/snowball
- were developed by Martin Porter and Richard Boulton.
- The full snowball package is available from
- http://snowball.tartarus.org/
-
- The KStem stemmer in
- analysis/common/src/org/apache/lucene/analysis/en
- was developed by Bob Krovetz and Sergio Guzman-Lara (CIIR-UMass Amherst)
- under the BSD-license.
-
- The Arabic,Persian,Romanian,Bulgarian, and Hindi analyzers (common) come with a default
- stopword list that is BSD-licensed created by Jacques Savoy. These files reside in:
- analysis/common/src/resources/org/apache/lucene/analysis/ar/stopwords.txt,
- analysis/common/src/resources/org/apache/lucene/analysis/fa/stopwords.txt,
- analysis/common/src/resources/org/apache/lucene/analysis/ro/stopwords.txt,
- analysis/common/src/resources/org/apache/lucene/analysis/bg/stopwords.txt,
- analysis/common/src/resources/org/apache/lucene/analysis/hi/stopwords.txt
- See http://members.unine.ch/jacques.savoy/clef/index.html.
-
- The German,Spanish,Finnish,French,Hungarian,Italian,Portuguese,Russian and Swedish light stemmers
- (common) are based on BSD-licensed reference implementations created by Jacques Savoy and
- Ljiljana Dolamic. These files reside in:
- analysis/common/src/java/org/apache/lucene/analysis/de/GermanLightStemmer.java
- analysis/common/src/java/org/apache/lucene/analysis/de/GermanMinimalStemmer.java
- analysis/common/src/java/org/apache/lucene/analysis/es/SpanishLightStemmer.java
- analysis/common/src/java/org/apache/lucene/analysis/fi/FinnishLightStemmer.java
- analysis/common/src/java/org/apache/lucene/analysis/fr/FrenchLightStemmer.java
- analysis/common/src/java/org/apache/lucene/analysis/fr/FrenchMinimalStemmer.java
- analysis/common/src/java/org/apache/lucene/analysis/hu/HungarianLightStemmer.java
- analysis/common/src/java/org/apache/lucene/analysis/it/ItalianLightStemmer.java
- analysis/common/src/java/org/apache/lucene/analysis/pt/PortugueseLightStemmer.java
- analysis/common/src/java/org/apache/lucene/analysis/ru/RussianLightStemmer.java
- analysis/common/src/java/org/apache/lucene/analysis/sv/SwedishLightStemmer.java
-
- The Stempel analyzer (stempel) includes BSD-licensed software developed
- by the Egothor project http://egothor.sf.net/, created by Leo Galambos, Martin Kvapil,
- and Edmond Nolan.
-
- The Polish analyzer (stempel) comes with a default
- stopword list that is BSD-licensed created by the Carrot2 project. The file resides
- in stempel/src/resources/org/apache/lucene/analysis/pl/stopwords.txt.
- See http://project.carrot2.org/license.html.
-
- The SmartChineseAnalyzer source code (smartcn) was
- provided by Xiaoping Gao and copyright 2009 by www.imdict.net.
-
- WordBreakTestUnicode_*.java (under modules/analysis/common/src/test/)
- is derived from Unicode data such as the Unicode Character Database.
- See http://unicode.org/copyright.html for more details.
-
- The Morfologik analyzer (morfologik) includes BSD-licensed software
- developed by Dawid Weiss and Marcin Miłkowski (http://morfologik.blogspot.com/).
-
- Morfologik uses data from Polish ispell/myspell dictionary
- (http://www.sjp.pl/slownik/en/) licenced on the terms of (inter alia)
- LGPL and Creative Commons ShareAlike.
-
- Morfologic includes data from BSD-licensed dictionary of Polish (SGJP)
- (http://sgjp.pl/morfeusz/)
-
- Servlet-api.jar and javax.servlet-*.jar are under the CDDL license, the original
- source code for this can be found at http://www.eclipse.org/jetty/downloads.php
-
- ===========================================================================
- Kuromoji Japanese Morphological Analyzer - Apache Lucene Integration
- ===========================================================================
-
- This software includes a binary and/or source version of data from
-
- mecab-ipadic-2.7.0-20070801
-
- which can be obtained from
-
- http://atilika.com/releases/mecab-ipadic/mecab-ipadic-2.7.0-20070801.tar.gz
-
- or
-
- http://jaist.dl.sourceforge.net/project/mecab/mecab-ipadic/2.7.0-20070801/mecab-ipadic-2.7.0-20070801.tar.gz
-
- ===========================================================================
- mecab-ipadic-2.7.0-20070801 Notice
- ===========================================================================
-
- Nara Institute of Science and Technology (NAIST),
- the copyright holders, disclaims all warranties with regard to this
- software, including all implied warranties of merchantability and
- fitness, in no event shall NAIST be liable for
- any special, indirect or consequential damages or any damages
- whatsoever resulting from loss of use, data or profits, whether in an
- action of contract, negligence or other tortuous action, arising out
- of or in connection with the use or performance of this software.
-
- A large portion of the dictionary entries
- originate from ICOT Free Software. The following conditions for ICOT
- Free Software applies to the current dictionary as well.
-
- Each User may also freely distribute the Program, whether in its
- original form or modified, to any third party or parties, PROVIDED
- that the provisions of Section 3 ("NO WARRANTY") will ALWAYS appear
- on, or be attached to, the Program, which is distributed substantially
- in the same form as set out herein and that such intended
- distribution, if actually made, will neither violate or otherwise
- contravene any of the laws and regulations of the countries having
- jurisdiction over the User or the intended distribution itself.
-
- NO WARRANTY
-
- The program was produced on an experimental basis in the course of the
- research and development conducted during the project and is provided
- to users as so produced on an experimental basis. Accordingly, the
- program is provided without any warranty whatsoever, whether express,
- implied, statutory or otherwise. The term "warranty" used herein
- includes, but is not limited to, any warranty of the quality,
- performance, merchantability and fitness for a particular purpose of
- the program and the nonexistence of any infringement or violation of
- any right of any third party.
-
- Each user of the program will agree and understand, and be deemed to
- have agreed and understood, that there is no warranty whatsoever for
- the program and, accordingly, the entire risk arising from or
- otherwise connected with the program is assumed by the user.
-
- Therefore, neither ICOT, the copyright holder, or any other
- organization that participated in or was otherwise related to the
- development of the program and their respective officials, directors,
- officers and other employees shall be held liable for any and all
- damages, including, without limitation, general, special, incidental
- and consequential damages, arising out of or otherwise in connection
- with the use or inability to use the program or any product, material
- or result produced or otherwise obtained by using the program,
- regardless of whether they have been advised of, or otherwise had
- knowledge of, the possibility of such damages at any time during the
- project or thereafter. Each user will be deemed to have agreed to the
- foregoing by his or her commencement of use of the program. The term
- "use" as used herein includes, but is not limited to, the use,
- modification, copying and distribution of the program and the
- production of secondary products from the program.
-
- In the case where the program, whether in its original form or
- modified, was distributed or delivered to or received by a user from
- any person, organization or entity other than ICOT, unless it makes or
- grants independently of ICOT any specific warranty to the user in
- writing, such person, organization or entity, will also be exempted
- from and not be held liable to the user for any such damages as noted
- above as far as the program is concerned.
-
- (ASLv2) Carrotsearch HPPC
- The following NOTICE information applies:
- HPPC borrowed code, ideas or both from:
-
- * Apache Lucene, http://lucene.apache.org/
- (Apache license)
- * Fastutil, http://fastutil.di.unimi.it/
- (Apache license)
- * Koloboke, https://github.com/OpenHFT/Koloboke
- (Apache license)
-
- (ASLv2) Joda Time
- The following NOTICE information applies:
- This product includes software developed by
- Joda.org (http://www.joda.org/).
-
- (ASLv2) The Netty Project
- The following NOTICE information applies:
-
- (ASLv2) Jackson JSON processor
- The following NOTICE information applies:
- # Jackson JSON processor
-
- Jackson is a high-performance, Free/Open Source JSON processing library.
- It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
- been in development since 2007.
- It is currently developed by a community of developers, as well as supported
- commercially by FasterXML.com.
-
- ## Licensing
-
- Jackson core and extension components may licensed under different licenses.
- To find the details that apply to this artifact see the accompanying LICENSE file.
- For more information, including possible other licensing options, contact
- FasterXML.com (http://fasterxml.com).
-
- ## Credits
-
- A list of contributors may be found from CREDITS file, which is included
- in some artifacts (usually source distributions); but is always available
- from the source code management (SCM) system project uses.
-
- The Netty Project
- =================
-
- Please visit the Netty web site for more information:
-
- * http://netty.io/
-
- Copyright 2011 The Netty Project
-
- The Netty Project licenses this file to you under the Apache License,
- version 2.0 (the "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at:
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- License for the specific language governing permissions and limitations
- under the License.
-
- Also, please refer to each LICENSE..txt file, which is located in
- the 'license' directory of the distribution file, for the license terms of the
- components that this product depends on.
-
- -------------------------------------------------------------------------------
- This product contains the extensions to Java Collections Framework which has
- been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
-
- * LICENSE:
- * license/LICENSE.jsr166y.txt (Public Domain)
- * HOMEPAGE:
- * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
- * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
-
- This product contains a modified version of Robert Harder's Public Domain
- Base64 Encoder and Decoder, which can be obtained at:
-
- * LICENSE:
- * license/LICENSE.base64.txt (Public Domain)
- * HOMEPAGE:
- * http://iharder.sourceforge.net/current/java/base64/
-
- This product contains a modified version of 'JZlib', a re-implementation of
- zlib in pure Java, which can be obtained at:
-
- * LICENSE:
- * license/LICENSE.jzlib.txt (BSD Style License)
- * HOMEPAGE:
- * http://www.jcraft.com/jzlib/
-
- This product contains a modified version of 'Webbit', a Java event based
- WebSocket and HTTP server:
-
- * LICENSE:
- * license/LICENSE.webbit.txt (BSD License)
- * HOMEPAGE:
- * https://github.com/joewalnes/webbit
-
- This product optionally depends on 'Protocol Buffers', Google's data
- interchange format, which can be obtained at:
-
- * LICENSE:
- * license/LICENSE.protobuf.txt (New BSD License)
- * HOMEPAGE:
- * http://code.google.com/p/protobuf/
-
- This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
- a temporary self-signed X.509 certificate when the JVM does not provide the
- equivalent functionality. It can be obtained at:
-
- * LICENSE:
- * license/LICENSE.bouncycastle.txt (MIT License)
- * HOMEPAGE:
- * http://www.bouncycastle.org/
-
- This product optionally depends on 'SLF4J', a simple logging facade for Java,
- which can be obtained at:
-
- * LICENSE:
- * license/LICENSE.slf4j.txt (MIT License)
- * HOMEPAGE:
- * http://www.slf4j.org/
-
- This product optionally depends on 'Apache Log4J', a logging framework,
- which can be obtained at:
-
- * LICENSE:
- * license/LICENSE.log4j.txt (Apache License 2.0)
- * HOMEPAGE:
- * http://logging.apache.org/log4j/
-
- This product optionally depends on 'JBoss Logging', a logging framework,
- which can be obtained at:
-
- * LICENSE:
- * license/LICENSE.jboss-logging.txt (GNU LGPL 2.1)
- * HOMEPAGE:
- * http://anonsvn.jboss.org/repos/common/common-logging-spi/
-
- This product optionally depends on 'Apache Felix', an open source OSGi
- framework implementation, which can be obtained at:
-
- * LICENSE:
- * license/LICENSE.felix.txt (Apache License 2.0)
- * HOMEPAGE:
- * http://felix.apache.org/
-
- (ASLv2) t-digest
- The following NOTICE information applies:
- The code for the t-digest was originally authored by Ted Dunning
- A number of small but very helpful changes have been contributed by Adrien Grand (https://github.com/jpountz)
-
- (ASLv2) Apache Commons Codec
- The following NOTICE information applies:
- Apache Commons Codec
- Copyright 2002-2016 The Apache Software Foundation
-
- This product includes software developed at
- The Apache Software Foundation (http://www.apache.org/).
-
- src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
- contains test data from http://aspell.net/test/orig/batch0.tab.
- Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
-
- ===============================================================================
-
- The content of package org.apache.commons.codec.language.bm has been translated
- from the original php source code available at http://stevemorse.org/phoneticinfo.htm
- with permission from the original authors.
- Original source copyright:
- Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
-
- (ASLv2) Apache Commons Lang
- The following NOTICE information applies:
- Apache Commons Lang
- Copyright 2001-2015 The Apache Software Foundation
-
- This product includes software from the Spring Framework,
- under the Apache License 2.0 (see: StringUtils.containsWhitespace())
-
- (ASLv2) Apache HttpComponents
- The following NOTICE information applies:
- Apache HttpComponents Client
- Copyright 1999-2016 The Apache Software Foundation
-
- This product includes software developed at
- The Apache Software Foundation (http://www.apache.org/).
-
- (ASLv2) Apache Log4J
- The following NOTICE information applies:
- Apache log4j
- Copyright 2010 The Apache Software Foundation
-
- This product includes software developed at
- The Apache Software Foundation (http://www.apache.org/).
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/pom.xml
deleted file mode 100644
index 6f487200c2..0000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/pom.xml
+++ /dev/null
@@ -1,109 +0,0 @@
-
-
-
- 4.0.0
-
- nifi-elasticsearch-bundle
- org.apache.nifi
- 1.16.0-SNAPSHOT
-
-
- nifi-elasticsearch-5-processors
- jar
-
-
- 2.7
- 5.0.1
- 6.2.1
-
-
-
-
- org.apache.nifi
- nifi-api
- provided
-
-
- org.apache.nifi
- nifi-properties
- provided
-
-
- org.apache.nifi
- nifi-processor-utils
- 1.16.0-SNAPSHOT
-
-
- org.apache.nifi
- nifi-mock
- 1.16.0-SNAPSHOT
- test
-
-
- org.elasticsearch.client
- transport
- ${es.version}
-
-
- org.elasticsearch.client
- rest
- ${es.version}
-
-
- commons-logging
- commons-logging
-
-
-
-
- org.apache.nifi
- nifi-ssl-context-service-api
-
-
- commons-io
- commons-io
- 2.10.0
-
-
- org.slf4j
- jcl-over-slf4j
-
-
-
- org.apache.logging.log4j
- log4j-api
-
-
- org.apache.logging.log4j
- log4j-to-slf4j
-
-
- org.apache.nifi
- nifi-ssl-context-service
- test
-
-
-
-
-
-
- org.apache.rat
- apache-rat-plugin
-
-
- src/test/resources/*.json
-
-
-
-
-
-
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearch5Processor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearch5Processor.java
deleted file mode 100644
index 5017d2ccd9..0000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearch5Processor.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.elasticsearch;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.util.StringUtils;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * A base class for all Elasticsearch processors
- */
-abstract class AbstractElasticsearch5Processor extends AbstractProcessor {
-
- public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
- .name("el5-ssl-context-service")
- .displayName("SSL Context Service")
- .description("The SSL Context Service used to provide client certificate information for TLS/SSL "
- + "connections. This service only applies if the Elasticsearch endpoint(s) have been secured with TLS/SSL.")
- .required(false)
- .identifiesControllerService(SSLContextService.class)
- .build();
-
- protected static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
- .name("el5-charset")
- .displayName("Character Set")
- .description("Specifies the character set of the document data.")
- .required(true)
- .defaultValue("UTF-8")
- .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
- .name("el5-username")
- .displayName("Username")
- .description("Username to access the Elasticsearch cluster")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
- .name("el5-password")
- .displayName("Password")
- .description("Password to access the Elasticsearch cluster")
- .required(false)
- .sensitive(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- protected abstract void createElasticsearchClient(ProcessContext context) throws ProcessException;
-
- @Override
- protected Collection customValidate(ValidationContext validationContext) {
- Set results = new HashSet<>();
-
- // Ensure that if username or password is set, then the other is too
- String userName = validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
- String password = validationContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
- if (StringUtils.isEmpty(userName) != StringUtils.isEmpty(password)) {
- results.add(new ValidationResult.Builder().valid(false).explanation(
- "If username or password is specified, then the other must be specified as well").build());
- }
-
- return results;
- }
-
- public void setup(ProcessContext context) {
- // Create the client if one does not already exist
- createElasticsearchClient(context);
- }
-
-}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearch5TransportClientProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearch5TransportClientProcessor.java
deleted file mode 100644
index 4196babaf8..0000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearch5TransportClientProcessor.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.elasticsearch;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.resource.ResourceCardinality;
-import org.apache.nifi.components.resource.ResourceType;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.util.StringUtils;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-
-abstract class AbstractElasticsearch5TransportClientProcessor extends AbstractElasticsearch5Processor {
-
- protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder()
- .name("el5-cluster-name")
- .displayName("Cluster Name")
- .description("Name of the ES cluster (for example, elasticsearch_brew). Defaults to 'elasticsearch'")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .defaultValue("elasticsearch")
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder()
- .name("el5-hosts")
- .displayName("ElasticSearch Hosts")
- .description("ElasticSearch Hosts, which should be comma separated and colon for hostname/port "
- + "host1:port,host2:port,.... For example testcluster:9300. This processor uses the Transport Client to "
- + "connect to hosts. The default transport client port is 9300.")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor PROP_XPACK_LOCATION = new PropertyDescriptor.Builder()
- .name("el5-xpack-location")
- .displayName("X-Pack Transport Location")
- .description("Specifies the path to the JAR(s) for the Elasticsearch X-Pack Transport feature. "
- + "If the Elasticsearch cluster has been secured with the X-Pack plugin, then the X-Pack Transport "
- + "JARs must also be available to this processor. Note: Do NOT place the X-Pack JARs into NiFi's "
- + "lib/ directory, doing so will prevent the X-Pack Transport JARs from being loaded.")
- .required(false)
- .identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, ResourceType.DIRECTORY)
- .dynamicallyModifiesClasspath(true)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder()
- .name("el5-ping-timeout")
- .displayName("ElasticSearch Ping Timeout")
- .description("The ping timeout used to determine when a node is unreachable. " +
- "For example, 5s (5 seconds). If non-local recommended is 30s")
- .required(true)
- .defaultValue("5s")
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder()
- .name("el5-sampler-interval")
- .displayName("Sampler Interval")
- .description("How often to sample / ping the nodes listed and connected. For example, 5s (5 seconds). "
- + "If non-local recommended is 30s.")
- .required(true)
- .defaultValue("5s")
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- protected final AtomicReference esClient = new AtomicReference<>();
- protected List esHosts;
-
- /**
- * Instantiate ElasticSearch Client. This should be called by subclasses' @OnScheduled method to create a client
- * if one does not yet exist. If called when scheduled, closeClient() should be called by the subclasses' @OnStopped
- * method so the client will be destroyed when the processor is stopped.
- *
- * @param context The context for this processor
- * @throws ProcessException if an error occurs while creating an Elasticsearch client
- */
- @Override
- protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
-
- ComponentLog log = getLogger();
- if (esClient.get() != null) {
- return;
- }
-
- log.debug("Creating ElasticSearch Client");
- try {
- final String clusterName = context.getProperty(CLUSTER_NAME).evaluateAttributeExpressions().getValue();
- final String pingTimeout = context.getProperty(PING_TIMEOUT).evaluateAttributeExpressions().getValue();
- final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).evaluateAttributeExpressions().getValue();
- final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
- final String password = context.getProperty(PASSWORD).getValue();
-
- final SSLContextService sslService =
- context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-
- Settings.Builder settingsBuilder = Settings.builder()
- .put("cluster.name", clusterName)
- .put("client.transport.ping_timeout", pingTimeout)
- .put("client.transport.nodes_sampler_interval", samplerInterval);
-
- String xPackUrl = context.getProperty(PROP_XPACK_LOCATION).evaluateAttributeExpressions().getValue();
- if (sslService != null) {
- settingsBuilder.put("xpack.security.transport.ssl.enabled", "true");
- if (!StringUtils.isEmpty(sslService.getKeyStoreFile())) {
- settingsBuilder.put("xpack.ssl.keystore.path", sslService.getKeyStoreFile());
- }
- if (!StringUtils.isEmpty(sslService.getKeyStorePassword())) {
- settingsBuilder.put("xpack.ssl.keystore.password", sslService.getKeyStorePassword());
- }
- if (!StringUtils.isEmpty(sslService.getKeyPassword())) {
- settingsBuilder.put("xpack.ssl.keystore.key_password", sslService.getKeyPassword());
- }
- if (!StringUtils.isEmpty(sslService.getTrustStoreFile())) {
- settingsBuilder.put("xpack.ssl.truststore.path", sslService.getTrustStoreFile());
- }
- if (!StringUtils.isEmpty(sslService.getTrustStorePassword())) {
- settingsBuilder.put("xpack.ssl.truststore.password", sslService.getTrustStorePassword());
- }
- }
-
- // Set username and password for X-Pack
- if (!StringUtils.isEmpty(username)) {
- StringBuffer secureUser = new StringBuffer(username);
- if (!StringUtils.isEmpty(password)) {
- secureUser.append(":");
- secureUser.append(password);
- }
- settingsBuilder.put("xpack.security.user", secureUser);
- }
-
- final String hosts = context.getProperty(HOSTS).evaluateAttributeExpressions().getValue();
- esHosts = getEsHosts(hosts);
- Client transportClient = getTransportClient(settingsBuilder, xPackUrl, username, password, esHosts, log);
- esClient.set(transportClient);
-
- } catch (Exception e) {
- log.error("Failed to create Elasticsearch client due to {}", new Object[]{e}, e);
- throw new ProcessException(e);
- }
- }
-
- protected Client getTransportClient(Settings.Builder settingsBuilder, String xPackPath,
- String username, String password,
- List esHosts, ComponentLog log)
- throws MalformedURLException {
-
- // Map of headers
- Map headers = new HashMap<>();
-
- TransportClient transportClient = null;
-
- // See if the Elasticsearch X-Pack JAR locations were specified, and create the
- // authorization token if username and password are supplied.
- if (!StringUtils.isBlank(xPackPath)) {
- ClassLoader xPackClassloader = Thread.currentThread().getContextClassLoader();
- try {
- // Get the plugin class
- Class xPackTransportClientClass = Class.forName("org.elasticsearch.xpack.client.PreBuiltXPackTransportClient", true, xPackClassloader);
- Constructor> ctor = xPackTransportClientClass.getConstructor(Settings.class, Class[].class);
-
- if (!StringUtils.isEmpty(username) && !StringUtils.isEmpty(password)) {
-
- // Need a couple of classes from the X-Path Transport JAR to build the token
- Class usernamePasswordTokenClass =
- Class.forName("org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken", true, xPackClassloader);
-
- Class securedStringClass =
- Class.forName("org.elasticsearch.xpack.security.authc.support.SecuredString", true, xPackClassloader);
-
- Constructor> securedStringCtor = securedStringClass.getConstructor(char[].class);
- Object securePasswordString = securedStringCtor.newInstance(password.toCharArray());
-
- Method basicAuthHeaderValue = usernamePasswordTokenClass.getMethod("basicAuthHeaderValue", String.class, securedStringClass);
- String authToken = (String) basicAuthHeaderValue.invoke(null, username, securePasswordString);
- if (authToken != null) {
- headers.put("Authorization", authToken);
- }
- transportClient = (TransportClient) ctor.newInstance(settingsBuilder.build(), new Class[0]);
- }
- } catch (ClassNotFoundException
- | NoSuchMethodException
- | InstantiationException
- | IllegalAccessException
- | InvocationTargetException xPackLoadException) {
- throw new ProcessException("X-Pack plugin could not be loaded and/or configured", xPackLoadException);
- }
- } else {
- getLogger().debug("No X-Pack Transport location specified, secure connections and/or authorization will not be available");
- }
- // If transportClient is null, either the processor is not configured for secure connections or there is a problem with config
- // (which is logged), so continue with a non-secure client
- if (transportClient == null) {
- transportClient = new PreBuiltTransportClient(settingsBuilder.build());
- }
- if (esHosts != null) {
- for (final InetSocketAddress host : esHosts) {
- try {
- transportClient.addTransportAddress(new InetSocketTransportAddress(host));
- } catch (IllegalArgumentException iae) {
- log.error("Could not add transport address {}", new Object[]{host});
- }
- }
- }
-
- Client client = transportClient.filterWithHeader(headers);
- return client;
- }
-
- /**
- * Dispose of ElasticSearch client
- */
- public void closeClient() {
- Client client = esClient.get();
- if (client != null) {
- getLogger().info("Closing ElasticSearch Client");
- esClient.set(null);
- client.close();
- }
- }
-
- /**
- * Get the ElasticSearch hosts from a NiFi attribute, e.g.
- *
- * @param hosts A comma-separated list of ElasticSearch hosts (host:port,host2:port2, etc.)
- * @return List of InetSocketAddresses for the ES hosts
- */
- private List getEsHosts(String hosts) {
-
- if (hosts == null) {
- return null;
- }
- final List esList = Arrays.asList(hosts.split(","));
- List esHosts = new ArrayList<>();
-
- for (String item : esList) {
-
- String[] addresses = item.split(":");
- final String hostName = addresses[0].trim();
- final int port = Integer.parseInt(addresses[1].trim());
-
- esHosts.add(new InetSocketAddress(hostName, port));
- }
- return esHosts;
- }
-
-
-}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteElasticsearch5.java
deleted file mode 100644
index f4a6dbaaaf..0000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteElasticsearch5.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.elasticsearch;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.elasticsearch.ElasticsearchTimeoutException;
-import org.elasticsearch.action.delete.DeleteRequestBuilder;
-import org.elasticsearch.action.delete.DeleteResponse;
-import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.transport.ReceiveTimeoutTransportException;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@EventDriven
-@Tags({"elasticsearch", "elasticsearch 5", "delete", "remove"})
-@CapabilityDescription("Delete a document from Elasticsearch 5.0 by document id. If the cluster has been configured for authorization and/or secure "
- + "transport (SSL/TLS), and the X-Pack plugin is available, secure connections can be made.")
-@WritesAttributes({
- @WritesAttribute(attribute = DeleteElasticsearch5.ES_ERROR_MESSAGE, description = "The message attribute in case of error"),
- @WritesAttribute(attribute = DeleteElasticsearch5.ES_FILENAME, description = "The filename attribute which is set to the document identifier"),
- @WritesAttribute(attribute = DeleteElasticsearch5.ES_INDEX, description = "The Elasticsearch index containing the document"),
- @WritesAttribute(attribute = DeleteElasticsearch5.ES_TYPE, description = "The Elasticsearch document type"),
- @WritesAttribute(attribute = DeleteElasticsearch5.ES_REST_STATUS, description = "The filename attribute with rest status")
-})
-@SeeAlso({FetchElasticsearch5.class,PutElasticsearch5.class})
-public class DeleteElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor {
-
- public static final String UNABLE_TO_DELETE_DOCUMENT_MESSAGE = "Unable to delete document";
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
- .description("All FlowFile corresponding to the deleted document from Elasticsearch are routed to this relationship").build();
-
- public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
- .description("All FlowFile corresponding to delete document that failed from Elasticsearch are routed to this relationship").build();
-
- public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
- .description("A FlowFile is routed to this relationship if the document cannot be deleted because or retryable exception like timeout or node not available")
- .build();
-
- public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
- .description("A FlowFile is routed to this relationship if the specified document was not found in elasticsearch")
- .build();
-
- public static final PropertyDescriptor DOCUMENT_ID = new PropertyDescriptor.Builder()
- .name("el5-delete-document-id")
- .displayName("Document Identifier")
- .description("The identifier for the document to be deleted")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
- .name("el5-delete-index")
- .displayName("Index")
- .description("The name of the index to delete the document from")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
- .name("el5-delete-type")
- .displayName("Type")
- .description("The type of this document to be deleted")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- private static final Set relationships;
- private static final List propertyDescriptors;
-
- public static final String ES_ERROR_MESSAGE = "es.error.message";
- public static final String ES_FILENAME = "filename";
- public static final String ES_INDEX = "es.index";
- public static final String ES_TYPE = "es.type";
- public static final String ES_REST_STATUS = "es.rest.status";
-
- static {
- final Set relations = new HashSet<>();
- relations.add(REL_SUCCESS);
- relations.add(REL_FAILURE);
- relations.add(REL_RETRY);
- relations.add(REL_NOT_FOUND);
- relationships = Collections.unmodifiableSet(relations);
-
- final List descriptors = new ArrayList<>();
- descriptors.add(CLUSTER_NAME);
- descriptors.add(HOSTS);
- descriptors.add(PROP_SSL_CONTEXT_SERVICE);
- descriptors.add(PROP_XPACK_LOCATION);
- descriptors.add(USERNAME);
- descriptors.add(PASSWORD);
- descriptors.add(PING_TIMEOUT);
- descriptors.add(SAMPLER_INTERVAL);
- descriptors.add(DOCUMENT_ID);
- descriptors.add(INDEX);
- descriptors.add(TYPE);
-
- propertyDescriptors = Collections.unmodifiableList(descriptors);
- }
-
- @Override
- public Set getRelationships() {
- return relationships;
- }
-
- @Override
- public final List getSupportedPropertyDescriptors() {
- return propertyDescriptors;
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-
- synchronized (esClient) {
- if(esClient.get() == null) {
- setup(context);
- }
- }
-
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
- final String documentId = context.getProperty(DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue();
- final String documentType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
-
- final ComponentLog logger = getLogger();
-
- if ( StringUtils.isBlank(index) ) {
- logger.debug("Index is required but was empty {}", new Object [] { index });
- flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Index is required but was empty");
- session.transfer(flowFile,REL_FAILURE);
- return;
- }
- if ( StringUtils.isBlank(documentType) ) {
- logger.debug("Document type is required but was empty {}", new Object [] { documentType });
- flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Document type is required but was empty");
- session.transfer(flowFile,REL_FAILURE);
- return;
- }
- if ( StringUtils.isBlank(documentId) ) {
- logger.debug("Document id is required but was empty {}", new Object [] { documentId });
- flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Document id is required but was empty");
- session.transfer(flowFile,REL_FAILURE);
- return;
- }
-
- flowFile = session.putAllAttributes(flowFile, new HashMap() {{
- put(ES_FILENAME, documentId);
- put(ES_INDEX, index);
- put(ES_TYPE, documentType);
- }});
-
- try {
-
- logger.debug("Deleting document {}/{}/{} from Elasticsearch", new Object[]{index, documentType, documentId});
- DeleteRequestBuilder requestBuilder = prepareDeleteRequest(index, documentId, documentType);
- final DeleteResponse response = doDelete(requestBuilder);
-
- if (response.status() != RestStatus.OK) {
- logger.warn("Failed to delete document {}/{}/{} from Elasticsearch: Status {}",
- new Object[]{index, documentType, documentId, response.status()});
- flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, UNABLE_TO_DELETE_DOCUMENT_MESSAGE);
- flowFile = session.putAttribute(flowFile, ES_REST_STATUS, response.status().toString());
- context.yield();
- if ( response.status() == RestStatus.NOT_FOUND ) {
- session.transfer(flowFile, REL_NOT_FOUND);
- } else {
- session.transfer(flowFile, REL_FAILURE);
- }
- } else {
- logger.debug("Elasticsearch document " + documentId + " deleted");
- session.transfer(flowFile, REL_SUCCESS);
- }
- } catch ( ElasticsearchTimeoutException
- | ReceiveTimeoutTransportException exception) {
- logger.error("Failed to delete document {} from Elasticsearch due to {}",
- new Object[]{documentId, exception.getLocalizedMessage()}, exception);
- flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, exception.getLocalizedMessage());
- session.transfer(flowFile, REL_RETRY);
- context.yield();
-
- } catch (Exception e) {
- logger.error("Failed to delete document {} from Elasticsearch due to {}", new Object[]{documentId, e.getLocalizedMessage()}, e);
- flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, e.getLocalizedMessage());
- session.transfer(flowFile, REL_FAILURE);
- context.yield();
- }
- }
-
- protected DeleteRequestBuilder prepareDeleteRequest(final String index, final String documentId, final String documentType) {
- return esClient.get().prepareDelete(index, documentType, documentId);
- }
-
- protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
- throws InterruptedException, ExecutionException {
- return requestBuilder.execute().get();
- }
-
- @OnStopped
- public void closeClient() {
- super.closeClient();
- }
-}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java
deleted file mode 100644
index 4805c56777..0000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.elasticsearch;
-
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.elasticsearch.ElasticsearchTimeoutException;
-import org.elasticsearch.action.get.GetRequestBuilder;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.client.transport.NoNodeAvailableException;
-import org.elasticsearch.node.NodeClosedException;
-import org.elasticsearch.transport.ReceiveTimeoutTransportException;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@EventDriven
-@SupportsBatching
-@Tags({"elasticsearch", "elasticsearch 5", "fetch", "read", "get"})
-@CapabilityDescription("Retrieves a document from Elasticsearch using the specified connection properties and the "
- + "identifier of the document to retrieve. If the cluster has been configured for authorization and/or secure "
- + "transport (SSL/TLS), and the X-Pack plugin is available, secure connections can be made. This processor "
- + "supports Elasticsearch 5.x clusters.")
-@WritesAttributes({
- @WritesAttribute(attribute = "filename", description = "The filename attributes is set to the document identifier"),
- @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
- @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type")
-})
-@SeeAlso({DeleteElasticsearch5.class,PutElasticsearch5.class})
-public class FetchElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor {
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
- .description("All FlowFiles that are read from Elasticsearch are routed to this relationship").build();
-
- public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
- .description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build();
-
- public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
- .description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may succeed")
- .build();
-
- public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
- .description("A FlowFile is routed to this relationship if the specified document does not exist in the Elasticsearch cluster")
- .build();
-
- public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder()
- .name("el5-fetch-doc-id")
- .displayName("Document Identifier")
- .description("The identifier for the document to be fetched")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
- .name("el5-fetch-index")
- .displayName("Index")
- .description("The name of the index to read from")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
- .name("el5-fetch-type")
- .displayName("Type")
- .description("The type of this document (used by Elasticsearch for indexing and searching)")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- private static final Set relationships;
- private static final List propertyDescriptors;
-
- static {
- final Set _rels = new HashSet<>();
- _rels.add(REL_SUCCESS);
- _rels.add(REL_FAILURE);
- _rels.add(REL_RETRY);
- _rels.add(REL_NOT_FOUND);
- relationships = Collections.unmodifiableSet(_rels);
-
- final List descriptors = new ArrayList<>();
- descriptors.add(CLUSTER_NAME);
- descriptors.add(HOSTS);
- descriptors.add(PROP_SSL_CONTEXT_SERVICE);
- descriptors.add(PROP_XPACK_LOCATION);
- descriptors.add(USERNAME);
- descriptors.add(PASSWORD);
- descriptors.add(PING_TIMEOUT);
- descriptors.add(SAMPLER_INTERVAL);
- descriptors.add(DOC_ID);
- descriptors.add(INDEX);
- descriptors.add(TYPE);
- descriptors.add(CHARSET);
-
- propertyDescriptors = Collections.unmodifiableList(descriptors);
- }
-
- @Override
- public Set getRelationships() {
- return relationships;
- }
-
- @Override
- public final List getSupportedPropertyDescriptors() {
- return propertyDescriptors;
- }
-
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-
- synchronized (esClient) {
- if(esClient.get() == null) {
- super.setup(context);
- }
- }
-
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
- final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
- final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
- final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
- final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
-
- final ComponentLog logger = getLogger();
- try {
-
- logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
- GetRequestBuilder getRequestBuilder = esClient.get().prepareGet(index, docType, docId);
- final GetResponse getResponse = getRequestBuilder.execute().actionGet();
-
- if (getResponse == null || !getResponse.isExists()) {
- logger.debug("Failed to read {}/{}/{} from Elasticsearch: Document not found",
- new Object[]{index, docType, docId});
-
- // We couldn't find the document, so penalize it and send it to "not found"
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_NOT_FOUND);
- } else {
- flowFile = session.putAllAttributes(flowFile, new HashMap() {{
- put("filename", docId);
- put("es.index", index);
- put("es.type", docType);
- }});
- flowFile = session.write(flowFile, new OutputStreamCallback() {
- @Override
- public void process(OutputStream out) throws IOException {
- out.write(getResponse.getSourceAsString().getBytes(charset));
- }
- });
- logger.debug("Elasticsearch document " + docId + " fetched, routing to success");
- // The document is JSON, so update the MIME type of the flow file
- flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
- session.getProvenanceReporter().fetch(flowFile, getResponse.remoteAddress().getAddress());
- session.transfer(flowFile, REL_SUCCESS);
- }
- } catch (NoNodeAvailableException
- | ElasticsearchTimeoutException
- | ReceiveTimeoutTransportException
- | NodeClosedException exceptionToRetry) {
- logger.error("Failed to read into Elasticsearch due to {}, this may indicate an error in configuration "
- + "(hosts, username/password, etc.), or this issue may be transient. Routing to retry",
- new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
- session.transfer(flowFile, REL_RETRY);
- context.yield();
-
- } catch (Exception e) {
- logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, e);
- session.transfer(flowFile, REL_FAILURE);
- context.yield();
- }
- }
-
- /**
- * Dispose of ElasticSearch client
- */
- @OnStopped
- public void closeClient() {
- super.closeClient();
- }
-}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java
deleted file mode 100644
index 2cfe01deec..0000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.elasticsearch;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.SystemResource;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.elasticsearch.ElasticsearchTimeoutException;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-
-import org.elasticsearch.client.transport.NoNodeAvailableException;
-import org.elasticsearch.node.NodeClosedException;
-import org.elasticsearch.transport.ReceiveTimeoutTransportException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@EventDriven
-@SupportsBatching
-@Tags({"elasticsearch", "elasticsearch 5","insert", "update", "write", "put"})
-@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as "
- + "the index to insert into and the type of the document. If the cluster has been configured for authorization "
- + "and/or secure transport (SSL/TLS), and the X-Pack plugin is available, secure connections can be made. This processor "
- + "supports Elasticsearch 5.x clusters.")
-@SeeAlso({FetchElasticsearch5.class,PutElasticsearch5.class})
-@SystemResourceConsideration(resource = SystemResource.MEMORY)
-public class PutElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor {
-
- private static final Validator NON_EMPTY_EL_VALIDATOR = (subject, value, context) -> {
- if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
- return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
- }
- return new ValidationResult.Builder().subject(subject).input(value).valid(value != null && !value.isEmpty()).explanation(subject + " cannot be empty").build();
- };
-
- static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
- .description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build();
-
- static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
- .description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build();
-
- static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
- .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
- .build();
-
- public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder()
- .name("el5-put-id-attribute")
- .displayName("Identifier Attribute")
- .description("The name of the attribute containing the identifier for each FlowFile")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
- .name("el5-put-index")
- .displayName("Index")
- .description("The name of the index to insert into")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(NON_EMPTY_EL_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
- .name("el5-put-type")
- .displayName("Type")
- .description("The type of this document (used by Elasticsearch for indexing and searching)")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(NON_EMPTY_EL_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
- .name("el5-put-index-op")
- .displayName("Index Operation")
- .description("The type of the operation used to index (index, update, upsert)")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(NON_EMPTY_EL_VALIDATOR)
- .defaultValue("index")
- .build();
-
- public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
- .name("el5-put-batch-size")
- .displayName("Batch Size")
- .description("The preferred number of FlowFiles to put to the database in a single transaction")
- .required(true)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .defaultValue("100")
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- private static final Set relationships;
- private static final List propertyDescriptors;
-
- static {
- final Set _rels = new HashSet<>();
- _rels.add(REL_SUCCESS);
- _rels.add(REL_FAILURE);
- _rels.add(REL_RETRY);
- relationships = Collections.unmodifiableSet(_rels);
-
- final List descriptors = new ArrayList<>();
- descriptors.add(CLUSTER_NAME);
- descriptors.add(HOSTS);
- descriptors.add(PROP_SSL_CONTEXT_SERVICE);
- descriptors.add(PROP_XPACK_LOCATION);
- descriptors.add(USERNAME);
- descriptors.add(PASSWORD);
- descriptors.add(PING_TIMEOUT);
- descriptors.add(SAMPLER_INTERVAL);
- descriptors.add(ID_ATTRIBUTE);
- descriptors.add(INDEX);
- descriptors.add(TYPE);
- descriptors.add(CHARSET);
- descriptors.add(BATCH_SIZE);
- descriptors.add(INDEX_OP);
-
- propertyDescriptors = Collections.unmodifiableList(descriptors);
- }
-
- @Override
- public Set getRelationships() {
- return relationships;
- }
-
- @Override
- public final List getSupportedPropertyDescriptors() {
- return propertyDescriptors;
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-
- synchronized (esClient) {
- if(esClient.get() == null) {
- super.setup(context);
- }
- }
-
- final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue();
- final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-
- final List flowFiles = session.get(batchSize);
- if (flowFiles.isEmpty()) {
- return;
- }
-
- final ComponentLog logger = getLogger();
- // Keep track of the list of flow files that need to be transferred. As they are transferred, remove them from the list.
- List flowFilesToTransfer = new LinkedList<>(flowFiles);
- try {
- final BulkRequestBuilder bulk = esClient.get().prepareBulk();
-
- for (FlowFile file : flowFiles) {
- final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();
- final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue();
- final String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(file).getValue();
- final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(file).getValue());
-
-
- final String id = file.getAttribute(id_attribute);
- if (id == null) {
- logger.warn("No value in identifier attribute {} for {}, transferring to failure", new Object[]{id_attribute, file});
- flowFilesToTransfer.remove(file);
- session.transfer(file, REL_FAILURE);
- } else {
- session.read(file, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- // For the bulk insert, each document has to be on its own line, so remove all CRLF
- String json = IOUtils.toString(in, charset)
- .replace("\r\n", " ").replace('\n', ' ').replace('\r', ' ');
-
- if (indexOp.equalsIgnoreCase("index")) {
- bulk.add(esClient.get().prepareIndex(index, docType, id)
- .setSource(json.getBytes(charset)));
- } else if (indexOp.equalsIgnoreCase("upsert")) {
- bulk.add(esClient.get().prepareUpdate(index, docType, id)
- .setDoc(json.getBytes(charset))
- .setDocAsUpsert(true));
- } else if (indexOp.equalsIgnoreCase("update")) {
- bulk.add(esClient.get().prepareUpdate(index, docType, id)
- .setDoc(json.getBytes(charset)));
- } else {
- throw new IOException("Index operation: " + indexOp + " not supported.");
- }
- }
- });
- }
- }
-
- if (bulk.numberOfActions() > 0) {
- final BulkResponse response = bulk.execute().actionGet();
- if (response.hasFailures()) {
- // Responses are guaranteed to be in order, remove them in reverse order
- BulkItemResponse[] responses = response.getItems();
- if (responses != null && responses.length > 0) {
- for (int i = responses.length - 1; i >= 0; i--) {
- final BulkItemResponse item = responses[i];
- final FlowFile flowFile = flowFilesToTransfer.get(item.getItemId());
- if (item.isFailed()) {
- logger.warn("Failed to insert {} into Elasticsearch due to {}, transferring to failure",
- new Object[]{flowFile, item.getFailure().getMessage()});
- session.transfer(flowFile, REL_FAILURE);
-
- } else {
- session.getProvenanceReporter().send(flowFile, response.remoteAddress().getAddress());
- session.transfer(flowFile, REL_SUCCESS);
- }
- flowFilesToTransfer.remove(flowFile);
- }
- }
- }
-
- // Transfer any remaining flowfiles to success
- for (FlowFile ff : flowFilesToTransfer) {
- session.getProvenanceReporter().send(ff, response.remoteAddress().getAddress());
- session.transfer(ff, REL_SUCCESS);
- }
- }
-
- } catch (NoNodeAvailableException
- | ElasticsearchTimeoutException
- | ReceiveTimeoutTransportException
- | NodeClosedException exceptionToRetry) {
-
- // Authorization errors and other problems are often returned as NoNodeAvailableExceptions without a
- // traceable cause. However the cause seems to be logged, just not available to this caught exception.
- // Since the error message will show up as a bulletin, we make specific mention to check the logs for
- // more details.
- logger.error("Failed to insert into Elasticsearch due to {}. More detailed information may be available in " +
- "the NiFi logs.",
- new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
- session.transfer(flowFilesToTransfer, REL_RETRY);
- context.yield();
-
- } catch (Exception exceptionToFail) {
- logger.error("Failed to insert into Elasticsearch due to {}, transferring to failure",
- new Object[]{exceptionToFail.getLocalizedMessage()}, exceptionToFail);
-
- session.transfer(flowFilesToTransfer, REL_FAILURE);
- context.yield();
- }
- }
-
- /**
- * Dispose of ElasticSearch client
- */
- @OnStopped
- public void closeClient() {
- super.closeClient();
- }
-}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
deleted file mode 100644
index ba9da13c62..0000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ /dev/null
@@ -1,17 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.nifi.processors.elasticsearch.FetchElasticsearch5
-org.apache.nifi.processors.elasticsearch.PutElasticsearch5
-org.apache.nifi.processors.elasticsearch.DeleteElasticsearch5
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITDeleteElasticsearch5Test.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITDeleteElasticsearch5Test.java
deleted file mode 100644
index 0ce65a39c9..0000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITDeleteElasticsearch5Test.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.elasticsearch;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.elasticsearch.action.delete.DeleteResponse;
-import org.elasticsearch.rest.RestStatus;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * Integration test for delete processor. Please set the hosts, cluster name, index and type etc before running the integrations.
- */
-@Ignore("Comment this out for es delete integration testing and set the appropriate cluster name, hosts, etc")
-public class ITDeleteElasticsearch5Test {
-
- private static final String TYPE1 = "type1";
- private static final String INDEX1 = "index1";
- protected DeleteResponse deleteResponse;
- protected RestStatus restStatus;
- private InputStream inputDocument;
- protected String clusterName = "elasticsearch";
- private String documentId;
-
- @Before
- public void setUp() throws IOException {
- ClassLoader classloader = Thread.currentThread().getContextClassLoader();
- inputDocument = classloader.getResourceAsStream("DocumentExample.json");
- long currentTimeMillis = System.currentTimeMillis();
- documentId = String.valueOf(currentTimeMillis);
- }
-
- @After
- public void teardown() {
- }
-
- @Test
- public void testPutAndDeleteIntegrationTestSuccess() {
- final TestRunner runnerPut = TestRunners.newTestRunner(new PutElasticsearch5());
- runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
- runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
-
- runnerPut.setProperty(PutElasticsearch5.INDEX, INDEX1);
- runnerPut.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
-
- runnerPut.setProperty(PutElasticsearch5.TYPE, TYPE1);
- runnerPut.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "id");
- runnerPut.assertValid();
-
- runnerPut.enqueue(inputDocument, new HashMap() {{
- put("id", documentId);
- }});
-
- runnerPut.enqueue(inputDocument);
- runnerPut.run(1, true, true);
-
- runnerPut.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
-
- final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
-
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
-
- runnerDelete.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
-
- runnerDelete.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
- runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
- runnerDelete.assertValid();
-
- runnerDelete.enqueue(new byte[] {}, new HashMap() {{
- put("documentId", documentId);
- }});
-
- runnerDelete.enqueue(new byte [] {});
- runnerDelete.run(1, true, true);
-
- runnerDelete.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
- }
-
- @Test
- public void testDeleteIntegrationTestDocumentNotFound() {
- final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
-
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
-
- runnerDelete.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
-
- runnerDelete.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
- runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
- runnerDelete.assertValid();
-
- runnerDelete.enqueue(new byte[] {}, new HashMap() {{
- put("documentId", documentId);
- }});
-
- runnerDelete.enqueue(new byte [] {});
- runnerDelete.run(1, true, true);
-
- runnerDelete.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
- final MockFlowFile out = runnerDelete.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
- }
-
- @Test
- public void testDeleteIntegrationTestBadIndex() {
- final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
-
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
-
- String index = String.valueOf(System.currentTimeMillis());
- runnerDelete.setProperty(DeleteElasticsearch5.INDEX, index);
-
- runnerDelete.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
- runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
- runnerDelete.assertValid();
-
- runnerDelete.enqueue(new byte[] {}, new HashMap() {{
- put("documentId", documentId);
- }});
-
- runnerDelete.enqueue(new byte [] {});
- runnerDelete.run(1, true, true);
- runnerDelete.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
- final MockFlowFile out = runnerDelete.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, index);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
- }
-
- @Test
- public void testDeleteIntegrationTestBadType() {
- final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
-
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
-
- runnerDelete.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
- String type = String.valueOf(System.currentTimeMillis());
- runnerDelete.setProperty(DeleteElasticsearch5.TYPE, type);
- runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
- runnerDelete.assertValid();
-
- runnerDelete.enqueue(new byte[] {}, new HashMap() {{
- put("documentId", documentId);
- }});
-
- runnerDelete.enqueue(new byte [] {});
- runnerDelete.run(1, true, true);
-
- runnerDelete.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
- final MockFlowFile out = runnerDelete.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, type);
- }
-}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestDeleteElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestDeleteElasticsearch5.java
deleted file mode 100644
index a6c9811d38..0000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestDeleteElasticsearch5.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.elasticsearch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.elasticsearch.ElasticsearchTimeoutException;
-import org.elasticsearch.action.delete.DeleteRequestBuilder;
-import org.elasticsearch.action.delete.DeleteResponse;
-import org.elasticsearch.rest.RestStatus;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestDeleteElasticsearch5 {
-
- private String documentId;
- private static final String TYPE1 = "type1";
- private static final String INDEX1 = "index1";
- private TestRunner runner;
- protected DeleteResponse deleteResponse;
- protected RestStatus restStatus;
- private DeleteElasticsearch5 mockDeleteProcessor;
- long currentTimeMillis;
-
- @Before
- public void setUp() throws IOException {
- currentTimeMillis = System.currentTimeMillis();
- documentId = String.valueOf(currentTimeMillis);
- mockDeleteProcessor = new DeleteElasticsearch5() {
-
- @Override
- protected DeleteRequestBuilder prepareDeleteRequest(String index, String docId, String docType) {
- return null;
- }
-
- @Override
- protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
- throws InterruptedException, ExecutionException {
- return deleteResponse;
- }
-
- @Override
- public void setup(ProcessContext context) {
- }
-
- };
-
- runner = TestRunners.newTestRunner(mockDeleteProcessor);
-
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
-
- runner.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
- runner.assertNotValid();
- runner.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
- runner.assertNotValid();
- runner.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
- runner.assertValid();
- }
-
- @After
- public void teardown() {
- runner = null;
- }
-
- @Test
- public void testDeleteWithNoDocumentId() throws IOException {
-
- runner.enqueue(new byte [] {});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
- assertNotNull(out);
- assertEquals("Document id is required but was empty",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
- }
-
- @Test
- public void testDeleteWithNoIndex() throws IOException {
- runner.setProperty(DeleteElasticsearch5.INDEX, "${index}");
-
- runner.enqueue(new byte [] {});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
- assertNotNull(out);
- assertEquals("Index is required but was empty",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
- }
-
- @Test
- public void testDeleteWithNoType() throws IOException {
- runner.setProperty(DeleteElasticsearch5.TYPE, "${type}");
-
- runner.enqueue(new byte [] {});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
- assertNotNull(out);
- assertEquals("Document type is required but was empty",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
- }
-
- @Test
- public void testDeleteSuccessful() throws IOException {
- restStatus = RestStatus.OK;
- deleteResponse = new DeleteResponse(null, TYPE1, documentId, 1, true) {
-
- @Override
- public RestStatus status() {
- return restStatus;
- }
-
- };
- runner.enqueue(new byte [] {}, new HashMap() {{
- put("documentId", documentId);
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_SUCCESS, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_SUCCESS).get(0);
- assertNotNull(out);
- assertEquals(null,out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
- out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
- }
-
- @Test
- public void testDeleteNotFound() throws IOException {
- restStatus = RestStatus.NOT_FOUND;
- deleteResponse = new DeleteResponse(null, TYPE1, documentId, 1, true) {
-
- @Override
- public RestStatus status() {
- return restStatus;
- }
-
- };
- runner.enqueue(new byte [] {}, new HashMap() {{
- put("documentId", documentId);
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
- assertNotNull(out);
- assertEquals(DeleteElasticsearch5.UNABLE_TO_DELETE_DOCUMENT_MESSAGE,out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
- out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, restStatus.toString());
- out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
- }
-
- @Test
- public void testDeleteServerFailure() throws IOException {
- restStatus = RestStatus.SERVICE_UNAVAILABLE;
- deleteResponse = new DeleteResponse(null, TYPE1, documentId, 1, true) {
-
- @Override
- public RestStatus status() {
- return restStatus;
- }
-
- };
- runner.enqueue(new byte [] {}, new HashMap() {{
- put("documentId", documentId);
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
- assertNotNull(out);
- assertEquals(DeleteElasticsearch5.UNABLE_TO_DELETE_DOCUMENT_MESSAGE,out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
- out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, restStatus.toString());
- out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
- }
-
- @Test
- public void testDeleteRetryableException() throws IOException {
- mockDeleteProcessor = new DeleteElasticsearch5() {
-
- @Override
- protected DeleteRequestBuilder prepareDeleteRequest(String index, String docId, String docType) {
- return null;
- }
-
- @Override
- protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
- throws InterruptedException, ExecutionException {
- throw new ElasticsearchTimeoutException("timeout");
- }
-
- @Override
- public void setup(ProcessContext context) {
- }
-
- };
- runner = TestRunners.newTestRunner(mockDeleteProcessor);
-
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
-
- runner.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
- runner.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
- runner.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
- runner.assertValid();
-
- runner.enqueue(new byte [] {}, new HashMap() {{
- put("documentId", documentId);
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_RETRY, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_RETRY).get(0);
- assertNotNull(out);
- assertEquals("timeout",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
- out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, null);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
- }
-
- @Test
- public void testDeleteNonRetryableException() throws IOException {
- mockDeleteProcessor = new DeleteElasticsearch5() {
-
- @Override
- protected DeleteRequestBuilder prepareDeleteRequest(String index, String docId, String docType) {
- return null;
- }
-
- @Override
- protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
- throws InterruptedException, ExecutionException {
- throw new InterruptedException("exception");
- }
-
- @Override
- public void setup(ProcessContext context) {
- }
-
- };
- runner = TestRunners.newTestRunner(mockDeleteProcessor);
-
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
-
- runner.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
- runner.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
- runner.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
- runner.assertValid();
-
- runner.enqueue(new byte [] {}, new HashMap() {{
- put("documentId", documentId);
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
- assertNotNull(out);
- assertEquals("exception",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
- out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, null);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
- out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
- }
-
-}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch5.java
deleted file mode 100644
index 7625d88f5e..0000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch5.java
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.elasticsearch;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.MockProcessContext;
-import org.apache.nifi.util.MockProcessorInitializationContext;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.elasticsearch.ElasticsearchParseException;
-import org.elasticsearch.ElasticsearchTimeoutException;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ListenableActionFuture;
-import org.elasticsearch.action.get.GetAction;
-import org.elasticsearch.action.get.GetRequestBuilder;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.support.AdapterActionFuture;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.NoNodeAvailableException;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.node.NodeClosedException;
-import org.elasticsearch.transport.ReceiveTimeoutTransportException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-public class TestFetchElasticsearch5 {
-
- private InputStream docExample;
- private TestRunner runner;
-
- @Before
- public void setUp() throws IOException {
- ClassLoader classloader = Thread.currentThread().getContextClassLoader();
- docExample = classloader.getResourceAsStream("DocumentExample.json");
-
- }
-
- @After
- public void teardown() {
- runner = null;
- }
-
- @Test
- public void testFetchElasticsearch5OnTrigger() throws IOException {
- runner = TestRunners.newTestRunner(new FetchElasticsearch5TestProcessor(true)); // all docs are found
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
-
- runner.setProperty(FetchElasticsearch5.INDEX, "doc");
- runner.assertNotValid();
- runner.setProperty(FetchElasticsearch5.TYPE, "status");
- runner.assertNotValid();
- runner.setProperty(FetchElasticsearch5.DOC_ID, "${doc_id}");
- runner.assertValid();
-
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652140");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_SUCCESS, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch5.REL_SUCCESS).get(0);
- assertNotNull(out);
- out.assertAttributeEquals("doc_id", "28039652140");
- }
-
- @Test
- public void testFetchElasticsearch5OnTriggerEL() throws IOException {
- runner = TestRunners.newTestRunner(new FetchElasticsearch5TestProcessor(true)); // all docs are found
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "${cluster.name}");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "${hosts}");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "${ping.timeout}");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "${sampler.interval}");
-
- runner.setProperty(FetchElasticsearch5.INDEX, "doc");
- runner.assertNotValid();
- runner.setProperty(FetchElasticsearch5.TYPE, "status");
- runner.assertNotValid();
- runner.setProperty(FetchElasticsearch5.DOC_ID, "${doc_id}");
- runner.assertValid();
- runner.setVariable("cluster.name", "elasticsearch");
- runner.setVariable("hosts", "127.0.0.1:9300");
- runner.setVariable("ping.timeout", "5s");
- runner.setVariable("sampler.interval", "5s");
-
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652140");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_SUCCESS, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch5.REL_SUCCESS).get(0);
- assertNotNull(out);
- out.assertAttributeEquals("doc_id", "28039652140");
- }
-
- @Test
- public void testFetchElasticsearch5OnTriggerWithFailures() throws IOException {
- runner = TestRunners.newTestRunner(new FetchElasticsearch5TestProcessor(false)); // simulate doc not found
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
- runner.setProperty(FetchElasticsearch5.INDEX, "doc");
- runner.setProperty(FetchElasticsearch5.TYPE, "status");
- runner.setProperty(FetchElasticsearch5.DOC_ID, "${doc_id}");
-
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652140");
- }});
- runner.run(1, true, true);
-
- // This test generates a "document not found"
- runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_NOT_FOUND, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch5.REL_NOT_FOUND).get(0);
- assertNotNull(out);
- out.assertAttributeEquals("doc_id", "28039652140");
- }
-
- @Test
- public void testFetchElasticsearch5WithBadHosts() throws IOException {
- runner = TestRunners.newTestRunner(new FetchElasticsearch5TestProcessor(false)); // simulate doc not found
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "http://127.0.0.1:9300,127.0.0.2:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
- runner.setProperty(FetchElasticsearch5.INDEX, "doc");
- runner.setProperty(FetchElasticsearch5.TYPE, "status");
- runner.setProperty(FetchElasticsearch5.DOC_ID, "${doc_id}");
-
- runner.assertNotValid();
- }
-
- @Test
- public void testFetchElasticsearch5OnTriggerWithExceptions() throws IOException {
- FetchElasticsearch5TestProcessor processor = new FetchElasticsearch5TestProcessor(true);
- runner = TestRunners.newTestRunner(processor);
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
- runner.setProperty(FetchElasticsearch5.INDEX, "doc");
- runner.setProperty(FetchElasticsearch5.TYPE, "status");
- runner.setProperty(FetchElasticsearch5.DOC_ID, "${doc_id}");
-
- // No Node Available exception
- processor.setExceptionToThrow(new NoNodeAvailableException("test"));
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652140");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
- runner.clearTransferState();
-
- // Elasticsearch5 Timeout exception
- processor.setExceptionToThrow(new ElasticsearchTimeoutException("test"));
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652141");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
- runner.clearTransferState();
-
- // Receive Timeout Transport exception
- processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class)));
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652141");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
- runner.clearTransferState();
-
- // Node Closed exception
- processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class)));
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652141");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
- runner.clearTransferState();
-
- // Elasticsearch5 Parse exception
- processor.setExceptionToThrow(new ElasticsearchParseException("test"));
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652141");
- }});
- runner.run(1, true, true);
-
- // This test generates an exception on execute(),routes to failure
- runner.assertTransferCount(FetchElasticsearch5.REL_FAILURE, 1);
- }
-
- @Test(expected = ProcessException.class)
- public void testCreateElasticsearch5ClientWithException() throws ProcessException {
- FetchElasticsearch5TestProcessor processor = new FetchElasticsearch5TestProcessor(true) {
- @Override
- protected Client getTransportClient(Settings.Builder settingsBuilder, String xPackPath,
- String username, String password,
- List esHosts, ComponentLog log)
- throws MalformedURLException {
- throw new MalformedURLException();
- }
- };
-
- MockProcessContext context = new MockProcessContext(processor);
- processor.initialize(new MockProcessorInitializationContext(processor, context));
- processor.callCreateElasticsearchClient(context);
- }
-
- @Test
- public void testSetupSecureClient() throws Exception {
- FetchElasticsearch5TestProcessor processor = new FetchElasticsearch5TestProcessor(true);
- runner = TestRunners.newTestRunner(processor);
- SSLContextService sslService = mock(SSLContextService.class);
- when(sslService.getIdentifier()).thenReturn("ssl-context");
- runner.addControllerService("ssl-context", sslService);
- runner.enableControllerService(sslService);
- runner.setProperty(FetchElasticsearch5.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
- runner.setProperty(FetchElasticsearch5.INDEX, "doc");
- runner.setProperty(FetchElasticsearch5.TYPE, "status");
- runner.setProperty(FetchElasticsearch5.DOC_ID, "${doc_id}");
-
- // Allow time for the controller service to fully initialize
- Thread.sleep(500);
-
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652140");
- }});
- runner.run(1, true, true);
-
- }
-
- /**
- * A Test class that extends the processor in order to inject/mock behavior
- */
- private static class FetchElasticsearch5TestProcessor extends FetchElasticsearch5 {
- boolean documentExists = true;
- Exception exceptionToThrow = null;
-
- public FetchElasticsearch5TestProcessor(boolean documentExists) {
- this.documentExists = documentExists;
- }
-
- public void setExceptionToThrow(Exception exceptionToThrow) {
- this.exceptionToThrow = exceptionToThrow;
- }
-
- @Override
- protected Client getTransportClient(Settings.Builder settingsBuilder, String xPackPath,
- String username, String password,
- List esHosts, ComponentLog log)
- throws MalformedURLException {
- TransportClient mockClient = mock(TransportClient.class);
- GetRequestBuilder getRequestBuilder = spy(new GetRequestBuilder(mockClient, GetAction.INSTANCE));
- if (exceptionToThrow != null) {
- doThrow(exceptionToThrow).when(getRequestBuilder).execute();
- } else {
- doReturn(new MockGetRequestBuilderExecutor(documentExists, esHosts.get(0))).when(getRequestBuilder).execute();
- }
- when(mockClient.prepareGet(anyString(), anyString(), anyString())).thenReturn(getRequestBuilder);
-
- return mockClient;
- }
-
- public void callCreateElasticsearchClient(ProcessContext context) {
- createElasticsearchClient(context);
- }
-
- private static class MockGetRequestBuilderExecutor
- extends AdapterActionFuture>
- implements ListenableActionFuture {
-
- boolean documentExists = true;
- InetSocketAddress address = null;
-
- public MockGetRequestBuilderExecutor(boolean documentExists, InetSocketAddress address) {
- this.documentExists = documentExists;
- this.address = address;
- }
-
-
- @Override
- protected GetResponse convert(ActionListener bulkResponseActionListener) {
- return null;
- }
-
- @Override
- public void addListener(ActionListener actionListener) {
-
- }
-
- @Override
- public GetResponse get() throws InterruptedException, ExecutionException {
- GetResponse response = mock(GetResponse.class);
- when(response.isExists()).thenReturn(documentExists);
- when(response.getSourceAsBytes()).thenReturn("Success".getBytes());
- when(response.getSourceAsString()).thenReturn("Success");
- TransportAddress remoteAddress = mock(TransportAddress.class);
- when(remoteAddress.getAddress()).thenReturn(address.toString());
- when(response.remoteAddress()).thenReturn(remoteAddress);
- return response;
- }
-
- @Override
- public GetResponse actionGet() {
- try {
- return get();
- } catch (Exception e) {
- fail(e.getMessage());
- }
- return null;
- }
- }
- }
-
-
- /////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Integration test section below
- //
- // The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution.
- // However if you wish to execute them as part of a test phase, comment out the @Ignored line for each
- // desired test.
- /////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- /**
- * Tests basic ES functionality against a local or test ES cluster
- */
- @Test
- @Ignore("Comment this out if you want to run against local or test ES")
- public void testFetchElasticsearch5Basic() {
- System.out.println("Starting test " + new Object() {
- }.getClass().getEnclosingMethod().getName());
- final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearch5());
-
- //Local Cluster - Mac pulled from brew
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
-
- runner.setProperty(FetchElasticsearch5.INDEX, "doc");
-
- runner.setProperty(FetchElasticsearch5.TYPE, "status");
- runner.setProperty(FetchElasticsearch5.DOC_ID, "${doc_id}");
- runner.assertValid();
-
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652140");
- }});
-
-
- runner.enqueue(docExample);
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_SUCCESS, 1);
- }
-
- @Test
- @Ignore("Comment this out if you want to run against local or test ES")
- public void testFetchElasticsearch5Batch() throws IOException {
- System.out.println("Starting test " + new Object() {
- }.getClass().getEnclosingMethod().getName());
- final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearch5());
-
- //Local Cluster - Mac pulled from brew
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
- runner.setProperty(FetchElasticsearch5.INDEX, "doc");
-
- runner.setProperty(FetchElasticsearch5.TYPE, "status");
- runner.setProperty(FetchElasticsearch5.DOC_ID, "${doc_id}");
- runner.assertValid();
-
-
- String message = convertStreamToString(docExample);
- for (int i = 0; i < 100; i++) {
-
- long newId = 28039652140L + i;
- final String newStrId = Long.toString(newId);
- runner.enqueue(message.getBytes(), new HashMap() {{
- put("doc_id", newStrId);
- }});
-
- }
-
- runner.run();
-
- runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_SUCCESS, 100);
- }
-
- /**
- * Convert an input stream to a stream
- *
- * @param is input the input stream
- * @return return the converted input stream as a string
- */
- static String convertStreamToString(InputStream is) {
- java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
- return s.hasNext() ? s.next() : "";
- }
-}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch5.java
deleted file mode 100644
index 34ed699cd3..0000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch5.java
+++ /dev/null
@@ -1,513 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.elasticsearch;
-
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.elasticsearch.ElasticsearchParseException;
-import org.elasticsearch.ElasticsearchTimeoutException;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ListenableActionFuture;
-import org.elasticsearch.action.bulk.BulkAction;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexAction;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.support.AdapterActionFuture;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.NoNodeAvailableException;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.node.NodeClosedException;
-import org.elasticsearch.transport.ReceiveTimeoutTransportException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class TestPutElasticsearch5 {
-
- private InputStream docExample;
- private TestRunner runner;
-
- @Before
- public void setUp() throws IOException {
- ClassLoader classloader = Thread.currentThread().getContextClassLoader();
- docExample = classloader.getResourceAsStream("DocumentExample.json");
- }
-
- @After
- public void teardown() {
- runner = null;
- }
-
- @Test
- public void testPutElasticSearchOnTrigger() throws IOException {
- runner = TestRunners.newTestRunner(new PutElasticsearch5TestProcessor(false)); // no failures
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
-
- runner.setProperty(PutElasticsearch5.INDEX, "doc");
- runner.assertNotValid();
- runner.setProperty(PutElasticsearch5.TYPE, "status");
- runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
- runner.assertNotValid();
- runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
- runner.assertValid();
-
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652140");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch5.REL_SUCCESS).get(0);
- assertNotNull(out);
- out.assertAttributeEquals("doc_id", "28039652140");
- }
-
- @Test
- public void testPutElasticSearchOnTriggerEL() throws IOException {
- runner = TestRunners.newTestRunner(new PutElasticsearch5TestProcessor(false)); // no failures
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "${cluster.name}");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "${hosts}");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "${ping.timeout}");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "${sampler.interval}");
-
- runner.setProperty(PutElasticsearch5.INDEX, "doc");
- runner.assertNotValid();
- runner.setProperty(PutElasticsearch5.TYPE, "status");
- runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
- runner.assertNotValid();
- runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
- runner.assertValid();
- runner.setVariable("cluster.name", "elasticsearch");
- runner.setVariable("hosts", "127.0.0.1:9300");
- runner.setVariable("ping.timeout", "5s");
- runner.setVariable("sampler.interval", "5s");
-
-
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652140");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch5.REL_SUCCESS).get(0);
- assertNotNull(out);
- out.assertAttributeEquals("doc_id", "28039652140");
- }
-
- @Test
- public void testPutElasticSearchOnTriggerBadDocIdentifier() throws IOException {
- runner = TestRunners.newTestRunner(new PutElasticsearch5TestProcessor(false)); // no failures
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
-
- runner.setProperty(PutElasticsearch5.INDEX, "doc");
- runner.assertNotValid();
- runner.setProperty(PutElasticsearch5.TYPE, "status");
- runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
- runner.assertNotValid();
- runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id2");
- runner.assertValid();
-
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652140");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_FAILURE, 1);
- }
-
- @Test
- public void testPutElasticSearchOnTriggerWithFailures() throws IOException {
- runner = TestRunners.newTestRunner(new PutElasticsearch5TestProcessor(true)); // simulate failures
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
- runner.setProperty(PutElasticsearch5.INDEX, "doc");
- runner.setProperty(PutElasticsearch5.TYPE, "status");
- runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
- runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
-
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652140");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_FAILURE, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch5.REL_FAILURE).get(0);
- assertNotNull(out);
- out.assertAttributeEquals("doc_id", "28039652140");
- }
-
- @Test
- public void testPutElasticsearch5OnTriggerWithExceptions() throws IOException {
- PutElasticsearch5TestProcessor processor = new PutElasticsearch5TestProcessor(false);
- runner = TestRunners.newTestRunner(processor);
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
- runner.setProperty(PutElasticsearch5.INDEX, "doc");
- runner.setProperty(PutElasticsearch5.TYPE, "status");
- runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
-
- // No Node Available exception
- processor.setExceptionToThrow(new NoNodeAvailableException("test"));
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652140");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
- runner.clearTransferState();
-
- // Elasticsearch5 Timeout exception
- processor.setExceptionToThrow(new ElasticsearchTimeoutException("test"));
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652141");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
- runner.clearTransferState();
-
- // Receive Timeout Transport exception
- processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class)));
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652142");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
- runner.clearTransferState();
-
- // Node Closed exception
- processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class)));
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652143");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(FetchElasticsearch5.REL_RETRY, 1);
- runner.clearTransferState();
-
- // Elasticsearch5 Parse exception
- processor.setExceptionToThrow(new ElasticsearchParseException("test"));
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652144");
- }});
- runner.run(1, true, true);
-
- // This test generates an exception on execute(),routes to failure
- runner.assertTransferCount(PutElasticsearch5.REL_FAILURE, 1);
- }
-
- @Test
- public void testPutElasticsearch5OnTriggerWithNoIdAttribute() throws IOException {
- runner = TestRunners.newTestRunner(new PutElasticsearch5TestProcessor(true)); // simulate failures
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
- runner.setProperty(PutElasticsearch5.INDEX, "doc");
- runner.setProperty(PutElasticsearch5.TYPE, "status");
- runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
- runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
-
- runner.enqueue(docExample);
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_FAILURE, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch5.REL_FAILURE).get(0);
- assertNotNull(out);
- }
-
- @Test
- public void testPutElasticsearch5OnTriggerWithIndexFromAttribute() throws IOException {
- runner = TestRunners.newTestRunner(new PutElasticsearch5TestProcessor(false));
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
- runner.setProperty(PutElasticsearch5.INDEX, "${i}");
- runner.setProperty(PutElasticsearch5.TYPE, "${type}");
- runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
- runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
-
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652144");
- put("i", "doc");
- put("type", "status");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch5.REL_SUCCESS).get(0);
- assertNotNull(out);
- runner.clearTransferState();
-
- // Now try an empty attribute value, should fail
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652144");
- put("type", "status");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_RETRY, 1);
- final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearch5.REL_RETRY).get(0);
- assertNotNull(out2);
- }
-
- @Test
- public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws IOException {
- runner = TestRunners.newTestRunner(new PutElasticsearch5TestProcessor(false)); // no failures
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
-
- runner.setProperty(PutElasticsearch5.INDEX, "doc");
- runner.assertNotValid();
- runner.setProperty(PutElasticsearch5.TYPE, "status");
- runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
- runner.assertNotValid();
- runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
- runner.assertValid();
-
- runner.setProperty(PutElasticsearch5.INDEX_OP, "index_fail");
- runner.assertValid();
-
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652140");
- }});
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_FAILURE, 1);
- final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch5.REL_FAILURE).get(0);
- assertNotNull(out);
- }
-
- /**
- * A Test class that extends the processor in order to inject/mock behavior
- */
- private static class PutElasticsearch5TestProcessor extends PutElasticsearch5 {
- boolean responseHasFailures = false;
- Exception exceptionToThrow = null;
-
- public PutElasticsearch5TestProcessor(boolean responseHasFailures) {
- this.responseHasFailures = responseHasFailures;
- }
-
- public void setExceptionToThrow(Exception exceptionToThrow) {
- this.exceptionToThrow = exceptionToThrow;
- }
-
-
- @Override
- protected Client getTransportClient(Settings.Builder settingsBuilder, String xPackPath,
- String username, String password,
- List esHosts, ComponentLog log)
- throws MalformedURLException {
- final Client mockClient = mock(Client.class);
- BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(mockClient, BulkAction.INSTANCE));
- if (exceptionToThrow != null) {
- doThrow(exceptionToThrow).when(bulkRequestBuilder).execute();
- } else {
- doReturn(new MockBulkRequestBuilderExecutor(responseHasFailures, esHosts.get(0))).when(bulkRequestBuilder).execute();
- }
- when(mockClient.prepareBulk()).thenReturn(bulkRequestBuilder);
-
- when(mockClient.prepareIndex(anyString(), anyString(), anyString())).thenAnswer(new Answer() {
- @Override
- public IndexRequestBuilder answer(InvocationOnMock invocationOnMock) throws Throwable {
- Object[] args = invocationOnMock.getArguments();
- String arg1 = (String) args[0];
- if (arg1.isEmpty()) {
- throw new NoNodeAvailableException("Needs index");
- }
- String arg2 = (String) args[1];
- if (arg2.isEmpty()) {
- throw new NoNodeAvailableException("Needs doc type");
- } else {
- IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(mockClient, IndexAction.INSTANCE);
- return indexRequestBuilder;
- }
- }
- });
-
- return mockClient;
- }
-
- private static class MockBulkRequestBuilderExecutor
- extends AdapterActionFuture>
- implements ListenableActionFuture {
-
- boolean responseHasFailures = false;
- InetSocketAddress address = null;
-
- public MockBulkRequestBuilderExecutor(boolean responseHasFailures, InetSocketAddress address) {
- this.responseHasFailures = responseHasFailures;
- this.address = address;
- }
-
- @Override
- protected BulkResponse convert(ActionListener bulkResponseActionListener) {
- return null;
- }
-
- @Override
- public void addListener(ActionListener actionListener) {
-
- }
-
- @Override
- public BulkResponse get() throws InterruptedException, ExecutionException {
- BulkResponse response = mock(BulkResponse.class);
- when(response.hasFailures()).thenReturn(responseHasFailures);
- BulkItemResponse item = mock(BulkItemResponse.class);
- when(item.getItemId()).thenReturn(1);
- when(item.isFailed()).thenReturn(true);
- when(response.getItems()).thenReturn(new BulkItemResponse[]{item});
- TransportAddress remoteAddress = mock(TransportAddress.class);
- when(remoteAddress.getAddress()).thenReturn(address.toString());
- when(response.remoteAddress()).thenReturn(remoteAddress);
- return response;
- }
-
- }
- }
-
-
- /////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Integration test section below
- //
- // The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution.
- // However if you wish to execute them as part of a test phase, comment out the @Ignored line for each
- // desired test.
- /////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- /**
- * Tests basic ES functionality against a local or test ES cluster
- */
- @Test
- @Ignore("Comment this out if you want to run against local or test ES")
- public void testPutElasticSearchBasic() {
- System.out.println("Starting test " + new Object() {
- }.getClass().getEnclosingMethod().getName());
- final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch5());
-
- //Local Cluster - Mac pulled from brew
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
-
- runner.setProperty(PutElasticsearch5.INDEX, "doc");
- runner.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
-
- runner.setProperty(PutElasticsearch5.TYPE, "status");
- runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
- runner.assertValid();
-
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", "28039652140");
- }});
-
-
- runner.enqueue(docExample);
- runner.run(1, true, true);
-
- runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
- }
-
- @Test
- @Ignore("Comment this out if you want to run against local or test ES")
- public void testPutElasticSearchBatch() throws IOException {
- System.out.println("Starting test " + new Object() {
- }.getClass().getEnclosingMethod().getName());
- final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch5());
-
- //Local Cluster - Mac pulled from brew
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
- runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
- runner.setProperty(PutElasticsearch5.INDEX, "doc");
- runner.setProperty(PutElasticsearch5.BATCH_SIZE, "100");
-
- runner.setProperty(PutElasticsearch5.TYPE, "status");
- runner.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "doc_id");
- runner.assertValid();
-
-
- String message = convertStreamToString(docExample);
- for (int i = 0; i < 100; i++) {
-
- long newId = 28039652140L + i;
- final String newStrId = Long.toString(newId);
- runner.enqueue(message.getBytes(), new HashMap() {{
- put("doc_id", newStrId);
- }});
-
- }
-
- runner.run();
-
- runner.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 100);
- }
-
- /**
- * Convert an input stream to a stream
- *
- * @param is input the input stream
- * @return return the converted input stream as a string
- */
- static String convertStreamToString(java.io.InputStream is) {
- java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
- return s.hasNext() ? s.next() : "";
- }
-}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/resources/DocumentExample.json b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/resources/DocumentExample.json
deleted file mode 100644
index 66449cf1e1..0000000000
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/resources/DocumentExample.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
- "created_at": "Thu Jan 21 16:02:46 +0000 2016",
- "text": "This is a test document from a mock social media service",
- "contributors": null,
- "id": 28039652140,
- "shares": null,
- "geographic_location": null,
- "userinfo": {
- "name": "Not A. Person",
- "location": "Orlando, FL",
- "created_at": "Fri Oct 24 23:22:09 +0000 2008",
- "follow_count": 1,
- "url": "http://not.a.real.site",
- "id": 16958875,
- "lang": "en",
- "time_zone": "Mountain Time (US & Canada)",
- "description": "I'm a test person.",
- "following_count": 71,
- "screen_name": "Nobody"
- }
-}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
index e95d414d00..37703de3fd 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -29,8 +29,6 @@ language governing permissions and limitations under the License. -->
nifi-elasticsearch-client-service-nar
nifi-elasticsearch-nar
nifi-elasticsearch-processors
- nifi-elasticsearch-5-nar
- nifi-elasticsearch-5-processors
nifi-elasticsearch-restapi-nar
nifi-elasticsearch-restapi-processors
@@ -48,11 +46,6 @@ language governing permissions and limitations under the License. -->
nifi-elasticsearch-processors
1.16.0-SNAPSHOT
-
- org.apache.nifi
- nifi-elasticsearch-5-processors
- 1.16.0-SNAPSHOT
-
org.apache.nifi
nifi-elasticsearch-restapi-processors