From aa947e4d3e0e54877d24683fefa5da651332c433 Mon Sep 17 00:00:00 2001 From: Mike Thomsen Date: Sat, 3 Mar 2018 17:32:42 -0500 Subject: [PATCH] NIFI-4325 Added new processor that uses the JSON DSL. NIFI-4325 Cleaned up how ElasticSearch client service builds SSLContext, added query attribute to flowfiles and other changes requested in a code review. This closes #2113. Signed-off-by: Joe Percivall --- nifi-assembly/pom.xml | 12 + .../nifi-elasticsearch-5-processors/pom.xml | 5 + .../org.apache.nifi.processor.Processor | 2 +- .../pom.xml | 47 ++ .../src/main/resources/META-INF/LICENSE | 285 ++++++++++++ .../src/main/resources/META-INF/NOTICE | 429 ++++++++++++++++++ .../nifi-elasticsearch-client-service/pom.xml | 163 +++++++ .../ElasticSearchClientServiceImpl.java | 259 +++++++++++ ...g.apache.nifi.controller.ControllerService | 15 + .../ElasticSearch5ClientService_IT.java | 105 +++++ .../TestControllerServiceProcessor.java | 49 ++ .../src/test/resources/setup.script | 32 ++ .../nifi-elasticsearch-restapi-nar/pom.xml | 43 ++ .../src/main/resources/META-INF/LICENSE | 285 ++++++++++++ .../src/main/resources/META-INF/NOTICE | 429 ++++++++++++++++++ .../pom.xml | 116 +++++ .../elasticsearch/JsonQueryElasticsearch.java | 337 ++++++++++++++ .../org.apache.nifi.processor.Processor | 15 + .../JsonQueryElasticsearchTest.java | 232 ++++++++++ .../TestElasticSearchClientService.java | 206 +++++++++ .../src/test/resources/DocumentExample.json | 21 + .../src/test/resources/log4j.properties | 22 + .../nifi-elasticsearch-bundle/pom.xml | 11 +- .../pom.xml | 48 ++ .../ElasticSearchClientService.java | 114 +++++ .../nifi/elasticsearch/SearchResponse.java | 67 +++ .../nifi-standard-services-api-nar/pom.xml | 6 + .../nifi-standard-services/pom.xml | 1 + pom.xml | 28 +- 29 files changed, 3368 insertions(+), 16 deletions(-) create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-nar/src/main/resources/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.java create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/TestControllerServiceProcessor.java create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-nar/src/main/resources/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.java create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/DocumentExample.json create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/log4j.properties create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-elasticsearch-client-service-api/pom.xml create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/SearchResponse.java diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 5e5eef3c60..baf81578f5 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -445,6 +445,18 @@ language governing permissions and limitations under the License. --> 1.6.0-SNAPSHOT nar + + org.apache.nifi + nifi-elasticsearch-client-service-nar + 1.6.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-elasticsearch-restapi-nar + 1.6.0-SNAPSHOT + nar + org.apache.nifi nifi-lumberjack-nar diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/pom.xml index ba5e9aa33d..bea8596ae7 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/pom.xml @@ -53,6 +53,11 @@ language governing permissions and limitations under the License. --> transport ${es.version} + + org.elasticsearch.client + rest + ${es.version} + org.apache.nifi nifi-ssl-context-service-api diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 8db7223075..ba9da13c62 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -14,4 +14,4 @@ # limitations under the License. org.apache.nifi.processors.elasticsearch.FetchElasticsearch5 org.apache.nifi.processors.elasticsearch.PutElasticsearch5 -org.apache.nifi.processors.elasticsearch.DeleteElasticsearch5 +org.apache.nifi.processors.elasticsearch.DeleteElasticsearch5 \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-nar/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-nar/pom.xml new file mode 100644 index 0000000000..6dfa8f7bb3 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-nar/pom.xml @@ -0,0 +1,47 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-elasticsearch-bundle + 1.6.0-SNAPSHOT + + + nifi-elasticsearch-client-service-nar + 1.6.0-SNAPSHOT + nar + + true + true + + + + + org.apache.nifi + nifi-standard-services-api-nar + 1.6.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-elasticsearch-client-service + 1.6.0-SNAPSHOT + + + + diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..ee59a5d3d4 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,285 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + +The binary distribution of this product bundles 'HdrHistogram' which is available under a 2-Clause BSD style license: + + Copyright (c) 2012, 2013, 2014 Gil Tene + Copyright (c) 2014 Michael Barker + Copyright (c) 2014 Matt Warren + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + THE POSSIBILITY OF SUCH DAMAGE. + +The binary distribution of this product bundles 'Bouncy Castle JDK 1.5 Provider' + under an MIT style license. + + Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org) + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. + + +The binary distribution of this product bundles 'JOpt Simple' + under an MIT style license. + Copyright (c) 2004-2015 Paul R. Holser, Jr. + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..7368deb415 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,429 @@ +nifi-elasticsearch-client-service-nar +Copyright 2015-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Elasticsearch + The following NOTICE information applies: + Elasticsearch + Copyright 2009-2015 Elasticsearch + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2016 The Apache Software Foundation + + (ASLv2) Apache Lucene + The following NOTICE information applies: + Apache Lucene + Copyright 2014 The Apache Software Foundation + + Includes software from other Apache Software Foundation projects, + including, but not limited to: + - Apache Ant + - Apache Jakarta Regexp + - Apache Commons + - Apache Xerces + + ICU4J, (under analysis/icu) is licensed under an MIT styles license + and Copyright (c) 1995-2008 International Business Machines Corporation and others + + Some data files (under analysis/icu/src/data) are derived from Unicode data such + as the Unicode Character Database. See http://unicode.org/copyright.html for more + details. + + Brics Automaton (under core/src/java/org/apache/lucene/util/automaton) is + BSD-licensed, created by Anders Møller. See http://www.brics.dk/automaton/ + + The levenshtein automata tables (under core/src/java/org/apache/lucene/util/automaton) were + automatically generated with the moman/finenight FSA library, created by + Jean-Philippe Barrette-LaPierre. This library is available under an MIT license, + see http://sites.google.com/site/rrettesite/moman and + http://bitbucket.org/jpbarrette/moman/overview/ + + The class org.apache.lucene.util.WeakIdentityMap was derived from + the Apache CXF project and is Apache License 2.0. + + The Google Code Prettify is Apache License 2.0. + See http://code.google.com/p/google-code-prettify/ + + JUnit (junit-4.10) is licensed under the Common Public License v. 1.0 + See http://junit.sourceforge.net/cpl-v10.html + + This product includes code (JaspellTernarySearchTrie) from Java Spelling Checkin + g Package (jaspell): http://jaspell.sourceforge.net/ + License: The BSD License (http://www.opensource.org/licenses/bsd-license.php) + + The snowball stemmers in + analysis/common/src/java/net/sf/snowball + were developed by Martin Porter and Richard Boulton. + The snowball stopword lists in + analysis/common/src/resources/org/apache/lucene/analysis/snowball + were developed by Martin Porter and Richard Boulton. + The full snowball package is available from + http://snowball.tartarus.org/ + + The KStem stemmer in + analysis/common/src/org/apache/lucene/analysis/en + was developed by Bob Krovetz and Sergio Guzman-Lara (CIIR-UMass Amherst) + under the BSD-license. + + The Arabic,Persian,Romanian,Bulgarian, and Hindi analyzers (common) come with a default + stopword list that is BSD-licensed created by Jacques Savoy. These files reside in: + analysis/common/src/resources/org/apache/lucene/analysis/ar/stopwords.txt, + analysis/common/src/resources/org/apache/lucene/analysis/fa/stopwords.txt, + analysis/common/src/resources/org/apache/lucene/analysis/ro/stopwords.txt, + analysis/common/src/resources/org/apache/lucene/analysis/bg/stopwords.txt, + analysis/common/src/resources/org/apache/lucene/analysis/hi/stopwords.txt + See http://members.unine.ch/jacques.savoy/clef/index.html. + + The German,Spanish,Finnish,French,Hungarian,Italian,Portuguese,Russian and Swedish light stemmers + (common) are based on BSD-licensed reference implementations created by Jacques Savoy and + Ljiljana Dolamic. These files reside in: + analysis/common/src/java/org/apache/lucene/analysis/de/GermanLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/de/GermanMinimalStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/es/SpanishLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/fi/FinnishLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/fr/FrenchLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/fr/FrenchMinimalStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/hu/HungarianLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/it/ItalianLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/pt/PortugueseLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/ru/RussianLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/sv/SwedishLightStemmer.java + + The Stempel analyzer (stempel) includes BSD-licensed software developed + by the Egothor project http://egothor.sf.net/, created by Leo Galambos, Martin Kvapil, + and Edmond Nolan. + + The Polish analyzer (stempel) comes with a default + stopword list that is BSD-licensed created by the Carrot2 project. The file resides + in stempel/src/resources/org/apache/lucene/analysis/pl/stopwords.txt. + See http://project.carrot2.org/license.html. + + The SmartChineseAnalyzer source code (smartcn) was + provided by Xiaoping Gao and copyright 2009 by www.imdict.net. + + WordBreakTestUnicode_*.java (under modules/analysis/common/src/test/) + is derived from Unicode data such as the Unicode Character Database. + See http://unicode.org/copyright.html for more details. + + The Morfologik analyzer (morfologik) includes BSD-licensed software + developed by Dawid Weiss and Marcin Miłkowski (http://morfologik.blogspot.com/). + + Morfologik uses data from Polish ispell/myspell dictionary + (http://www.sjp.pl/slownik/en/) licenced on the terms of (inter alia) + LGPL and Creative Commons ShareAlike. + + Morfologic includes data from BSD-licensed dictionary of Polish (SGJP) + (http://sgjp.pl/morfeusz/) + + Servlet-api.jar and javax.servlet-*.jar are under the CDDL license, the original + source code for this can be found at http://www.eclipse.org/jetty/downloads.php + + =========================================================================== + Kuromoji Japanese Morphological Analyzer - Apache Lucene Integration + =========================================================================== + + This software includes a binary and/or source version of data from + + mecab-ipadic-2.7.0-20070801 + + which can be obtained from + + http://atilika.com/releases/mecab-ipadic/mecab-ipadic-2.7.0-20070801.tar.gz + + or + + http://jaist.dl.sourceforge.net/project/mecab/mecab-ipadic/2.7.0-20070801/mecab-ipadic-2.7.0-20070801.tar.gz + + =========================================================================== + mecab-ipadic-2.7.0-20070801 Notice + =========================================================================== + + Nara Institute of Science and Technology (NAIST), + the copyright holders, disclaims all warranties with regard to this + software, including all implied warranties of merchantability and + fitness, in no event shall NAIST be liable for + any special, indirect or consequential damages or any damages + whatsoever resulting from loss of use, data or profits, whether in an + action of contract, negligence or other tortuous action, arising out + of or in connection with the use or performance of this software. + + A large portion of the dictionary entries + originate from ICOT Free Software. The following conditions for ICOT + Free Software applies to the current dictionary as well. + + Each User may also freely distribute the Program, whether in its + original form or modified, to any third party or parties, PROVIDED + that the provisions of Section 3 ("NO WARRANTY") will ALWAYS appear + on, or be attached to, the Program, which is distributed substantially + in the same form as set out herein and that such intended + distribution, if actually made, will neither violate or otherwise + contravene any of the laws and regulations of the countries having + jurisdiction over the User or the intended distribution itself. + + NO WARRANTY + + The program was produced on an experimental basis in the course of the + research and development conducted during the project and is provided + to users as so produced on an experimental basis. Accordingly, the + program is provided without any warranty whatsoever, whether express, + implied, statutory or otherwise. The term "warranty" used herein + includes, but is not limited to, any warranty of the quality, + performance, merchantability and fitness for a particular purpose of + the program and the nonexistence of any infringement or violation of + any right of any third party. + + Each user of the program will agree and understand, and be deemed to + have agreed and understood, that there is no warranty whatsoever for + the program and, accordingly, the entire risk arising from or + otherwise connected with the program is assumed by the user. + + Therefore, neither ICOT, the copyright holder, or any other + organization that participated in or was otherwise related to the + development of the program and their respective officials, directors, + officers and other employees shall be held liable for any and all + damages, including, without limitation, general, special, incidental + and consequential damages, arising out of or otherwise in connection + with the use or inability to use the program or any product, material + or result produced or otherwise obtained by using the program, + regardless of whether they have been advised of, or otherwise had + knowledge of, the possibility of such damages at any time during the + project or thereafter. Each user will be deemed to have agreed to the + foregoing by his or her commencement of use of the program. The term + "use" as used herein includes, but is not limited to, the use, + modification, copying and distribution of the program and the + production of secondary products from the program. + + In the case where the program, whether in its original form or + modified, was distributed or delivered to or received by a user from + any person, organization or entity other than ICOT, unless it makes or + grants independently of ICOT any specific warranty to the user in + writing, such person, organization or entity, will also be exempted + from and not be held liable to the user for any such damages as noted + above as far as the program is concerned. + + (ASLv2) Carrotsearch HPPC + The following NOTICE information applies: + HPPC borrowed code, ideas or both from: + + * Apache Lucene, http://lucene.apache.org/ + (Apache license) + * Fastutil, http://fastutil.di.unimi.it/ + (Apache license) + * Koloboke, https://github.com/OpenHFT/Koloboke + (Apache license) + + (ASLv2) Joda Time + The following NOTICE information applies: + This product includes software developed by + Joda.org (http://www.joda.org/). + + (ASLv2) The Netty Project + The following NOTICE information applies: + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. + + The Netty Project + ================= + + Please visit the Netty web site for more information: + + * http://netty.io/ + + Copyright 2011 The Netty Project + + The Netty Project licenses this file to you under the Apache License, + version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + + Also, please refer to each LICENSE..txt file, which is located in + the 'license' directory of the distribution file, for the license terms of the + components that this product depends on. + + ------------------------------------------------------------------------------- + This product contains the extensions to Java Collections Framework which has + been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene: + + * LICENSE: + * license/LICENSE.jsr166y.txt (Public Domain) + * HOMEPAGE: + * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/ + * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/ + + This product contains a modified version of Robert Harder's Public Domain + Base64 Encoder and Decoder, which can be obtained at: + + * LICENSE: + * license/LICENSE.base64.txt (Public Domain) + * HOMEPAGE: + * http://iharder.sourceforge.net/current/java/base64/ + + This product contains a modified version of 'JZlib', a re-implementation of + zlib in pure Java, which can be obtained at: + + * LICENSE: + * license/LICENSE.jzlib.txt (BSD Style License) + * HOMEPAGE: + * http://www.jcraft.com/jzlib/ + + This product contains a modified version of 'Webbit', a Java event based + WebSocket and HTTP server: + + * LICENSE: + * license/LICENSE.webbit.txt (BSD License) + * HOMEPAGE: + * https://github.com/joewalnes/webbit + + This product optionally depends on 'Protocol Buffers', Google's data + interchange format, which can be obtained at: + + * LICENSE: + * license/LICENSE.protobuf.txt (New BSD License) + * HOMEPAGE: + * http://code.google.com/p/protobuf/ + + This product optionally depends on 'Bouncy Castle Crypto APIs' to generate + a temporary self-signed X.509 certificate when the JVM does not provide the + equivalent functionality. It can be obtained at: + + * LICENSE: + * license/LICENSE.bouncycastle.txt (MIT License) + * HOMEPAGE: + * http://www.bouncycastle.org/ + + This product optionally depends on 'SLF4J', a simple logging facade for Java, + which can be obtained at: + + * LICENSE: + * license/LICENSE.slf4j.txt (MIT License) + * HOMEPAGE: + * http://www.slf4j.org/ + + This product optionally depends on 'Apache Commons Logging', a logging + framework, which can be obtained at: + + * LICENSE: + * license/LICENSE.commons-logging.txt (Apache License 2.0) + * HOMEPAGE: + * http://commons.apache.org/logging/ + + This product optionally depends on 'Apache Log4J', a logging framework, + which can be obtained at: + + * LICENSE: + * license/LICENSE.log4j.txt (Apache License 2.0) + * HOMEPAGE: + * http://logging.apache.org/log4j/ + + This product optionally depends on 'JBoss Logging', a logging framework, + which can be obtained at: + + * LICENSE: + * license/LICENSE.jboss-logging.txt (GNU LGPL 2.1) + * HOMEPAGE: + * http://anonsvn.jboss.org/repos/common/common-logging-spi/ + + This product optionally depends on 'Apache Felix', an open source OSGi + framework implementation, which can be obtained at: + + * LICENSE: + * license/LICENSE.felix.txt (Apache License 2.0) + * HOMEPAGE: + * http://felix.apache.org/ + + (ASLv2) t-digest + The following NOTICE information applies: + The code for the t-digest was originally authored by Ted Dunning + A number of small but very helpful changes have been contributed by Adrien Grand (https://github.com/jpountz) + + (ASLv2) Apache Commons Logging + The following NOTICE information applies: + + Apache Commons Logging + Copyright 2003-2016 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + + (ASLv2) Apache Commons Codec + The following NOTICE information applies: + Apache Commons Codec + Copyright 2002-2016 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java + contains test data from http://aspell.net/test/orig/batch0.tab. + Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org) + + =============================================================================== + + The content of package org.apache.commons.codec.language.bm has been translated + from the original php source code available at http://stevemorse.org/phoneticinfo.htm + with permission from the original authors. + Original source copyright: + Copyright (c) 2008 Alexander Beider & Stephen P. Morse. + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache HttpComponents + The following NOTICE information applies: + Apache HttpComponents Client + Copyright 1999-2016 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + (ASLv2) Apache Log4J + The following NOTICE information applies: + Apache log4j + Copyright 2010 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml new file mode 100644 index 0000000000..290d81dd23 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml @@ -0,0 +1,163 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-elasticsearch-bundle + 1.6.0-SNAPSHOT + + + nifi-elasticsearch-client-service + jar + + + + org.apache.nifi + nifi-lookup-service-api + provided + 1.6.0-SNAPSHOT + + + org.apache.nifi + nifi-api + provided + 1.6.0-SNAPSHOT + + + org.apache.nifi + nifi-utils + provided + 1.6.0-SNAPSHOT + + + org.apache.nifi + nifi-distributed-cache-client-service-api + 1.6.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-record + 1.6.0-SNAPSHOT + provided + + + + com.fasterxml.jackson.core + jackson-databind + 2.9.4 + + + + commons-io + commons-io + 2.6 + + + + org.elasticsearch.client + rest + 5.0.1 + + + com.github.stephenc.findbugs + findbugs-annotations + 1.3.9-1 + + + org.apache.commons + commons-lang3 + 3.4 + + + org.slf4j + log4j-over-slf4j + ${org.slf4j.version} + + + + + + org.apache.nifi + nifi-mock + 1.6.0-SNAPSHOT + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + test + + + org.apache.nifi + nifi-ssl-context-service-api + 1.6.0-SNAPSHOT + compile + + + org.apache.nifi + nifi-elasticsearch-client-service-api + 1.6.0-SNAPSHOT + provided + + + + + + integration-tests + + + + com.github.alexcojocaru + elasticsearch-maven-plugin + 6.0 + + testCluster + 9500 + 9400 + 5.6.2 + 90 + ${project.basedir}/src/test/resources/setup.script + + + + start-elasticsearch + pre-integration-test + + runforked + + + + stop-elasticsearch + post-integration-test + + stop + + + + + + + + + diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java new file mode 100644 index 0000000000..9896a1937b --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.Charset; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService { + private ObjectMapper mapper = new ObjectMapper(); + + static final private List properties; + + private RestClient client; + + private String url; + private Charset charset; + + static { + List _props = new ArrayList(); + _props.add(ElasticSearchClientService.HTTP_HOSTS); + _props.add(ElasticSearchClientService.USERNAME); + _props.add(ElasticSearchClientService.PASSWORD); + _props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE); + _props.add(ElasticSearchClientService.CONNECT_TIMEOUT); + _props.add(ElasticSearchClientService.SOCKET_TIMEOUT); + _props.add(ElasticSearchClientService.RETRY_TIMEOUT); + _props.add(ElasticSearchClientService.CHARSET); + + properties = Collections.unmodifiableList(_props); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + try { + setupClient(context); + charset = Charset.forName(context.getProperty(CHARSET).getValue()); + } catch (Exception ex) { + getLogger().error("Could not initialize ElasticSearch client.", ex); + throw new InitializationException(ex); + } + } + + @OnDisabled + public void onDisabled() throws IOException { + this.client.close(); + this.url = null; + } + + private SSLContext buildSslContext(SSLContextService sslService) throws IOException, CertificateException, + NoSuchAlgorithmException, KeyStoreException, UnrecoverableKeyException, KeyManagementException { + KeyStore keyStore = KeyStore.getInstance(sslService.getKeyStoreType()); + KeyStore trustStore = KeyStore.getInstance("JKS"); + + try (final InputStream is = new FileInputStream(sslService.getKeyStoreFile())) { + keyStore.load(is, sslService.getKeyStorePassword().toCharArray()); + } + + try (final InputStream is = new FileInputStream(sslService.getTrustStoreFile())) { + trustStore.load(is, sslService.getTrustStorePassword().toCharArray()); + } + + final KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory + .getDefaultAlgorithm()); + kmf.init(keyStore, sslService.getKeyStorePassword().toCharArray()); + final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory + .getDefaultAlgorithm()); + tmf.init(keyStore); + SSLContext context1 = SSLContext.getInstance(sslService.getSslAlgorithm()); + context1.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom()); + return context1; + } + + private void setupClient(ConfigurationContext context) throws MalformedURLException, InitializationException { + final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue(); + String[] hostsSplit = hosts.split(",[\\s]*"); + this.url = hostsSplit[0]; + final SSLContextService sslService = + context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); + final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); + + final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger(); + final Integer readTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger(); + final Integer retryTimeout = context.getProperty(RETRY_TIMEOUT).asInteger(); + + HttpHost[] hh = new HttpHost[hostsSplit.length]; + for (int x = 0; x < hh.length; x++) { + URL u = new URL(hostsSplit[x]); + hh[x] = new HttpHost(u.getHost(), u.getPort(), u.getProtocol()); + } + + final SSLContext sslContext; + try { + sslContext = (sslService != null && sslService.isKeyStoreConfigured() && sslService.isTrustStoreConfigured()) + ? buildSslContext(sslService) : null; + } catch (IOException | CertificateException | NoSuchAlgorithmException | UnrecoverableKeyException + | KeyStoreException | KeyManagementException e) { + getLogger().error("Error building up SSL Context from the supplied configuration.", e); + throw new InitializationException(e); + } + + RestClientBuilder builder = RestClient.builder(hh) + .setHttpClientConfigCallback(httpClientBuilder -> { + if (sslContext != null) { + httpClientBuilder = httpClientBuilder.setSSLContext(sslContext); + } + + if (username != null && password != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(username, password)); + httpClientBuilder = httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + + return httpClientBuilder; + }) + .setRequestConfigCallback(requestConfigBuilder -> { + requestConfigBuilder.setConnectTimeout(connectTimeout); + requestConfigBuilder.setSocketTimeout(readTimeout); + return requestConfigBuilder; + }) + .setMaxRetryTimeoutMillis(retryTimeout); + + this.client = builder.build(); + } + + private Response runQuery(String query, String index, String type) throws IOException { + StringBuilder sb = new StringBuilder() + .append("/") + .append(index); + if (type != null && !type.equals("")) { + sb.append("/") + .append(type); + } + + sb.append("/_search"); + + HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON); + + return client.performRequest("POST", sb.toString(), Collections.emptyMap(), queryEntity); + } + + private Map parseResponse(Response response) throws IOException { + final int code = response.getStatusLine().getStatusCode(); + + if (code >= 200 & code < 300) { + InputStream inputStream = response.getEntity().getContent(); + byte[] result = IOUtils.toByteArray(inputStream); + inputStream.close(); + return mapper.readValue(new String(result, charset), Map.class); + } else { + String errorMessage = String.format("ElasticSearch reported an error while trying to run the query: %s", + response.getStatusLine().getReasonPhrase()); + throw new IOException(errorMessage); + } + } + + @Override + public SearchResponse search(String query, String index, String type) throws IOException { + Response response = runQuery(query, index, type); + Map parsed = parseResponse(response); + + int took = (Integer)parsed.get("took"); + boolean timedOut = (Boolean)parsed.get("timed_out"); + Map aggregations = parsed.get("aggregations") != null + ? (Map)parsed.get("aggregations") : new HashMap<>(); + Map hitsParent = (Map)parsed.get("hits"); + int count = (Integer)hitsParent.get("total"); + List> hits = (List>)hitsParent.get("hits"); + + SearchResponse esr = new SearchResponse(hits, aggregations, count, took, timedOut); + + if (getLogger().isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + sb.append("******************"); + sb.append(String.format("Took: %d", took)); + sb.append(String.format("Timed out: %s", timedOut)); + sb.append(String.format("Aggregation count: %d", aggregations.size())); + sb.append(String.format("Hit count: %d", hits.size())); + sb.append(String.format("Total found: %d", count)); + sb.append("******************"); + + getLogger().debug(sb.toString()); + } + + return esr; + } + + @Override + public String getTransitUrl(String index, String type) { + return new StringBuilder() + .append(this.url) + .append(index != null && !index.equals("") ? "/" : "") + .append(index != null ? index : "") + .append(type != null && !type.equals("") ? "/" : "") + .append(type != null ? type : "") + .toString(); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..161f652690 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.java new file mode 100644 index 0000000000..7ab618d319 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearch5ClientService_IT.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch.integration; + +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl; +import org.apache.nifi.elasticsearch.SearchResponse; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ElasticSearch5ClientService_IT { + + private TestRunner runner; + private ElasticSearchClientServiceImpl service; + + @Before + public void before() throws Exception { + runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class); + service = new ElasticSearchClientServiceImpl(); + runner.addControllerService("Client Service", service); + runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, "http://localhost:9400"); + runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000"); + runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000"); + runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, "60000"); + try { + runner.enableControllerService(service); + } catch (Exception ex) { + ex.printStackTrace(); + throw ex; + } + } + + @After + public void after() throws Exception { + service.onDisabled(); + } + + @Test + public void testBasicSearch() throws Exception { + String query = "{\n" + + "\t\"size\": 10,\n" + + "\t\"query\": {\n" + + "\t\t\"match_all\": {}\n" + + "\t},\n" + + "\t\"aggs\": {\n" + + "\t\t\"term_counts\": {\n" + + "\t\t\t\"terms\": {\n" + + "\t\t\t\t\"field\": \"msg\",\n" + + "\t\t\t\t\"size\": 5\n" + + "\t\t\t}\n" + + "\t\t}\n" + + "\t}\n" + + "}"; + SearchResponse response = service.search(query, "messages", "message"); + Assert.assertNotNull("Response was null", response); + + Assert.assertEquals("Wrong count", response.getNumberOfHits(), 15); + Assert.assertFalse("Timed out", response.isTimedOut()); + Assert.assertNotNull("Hits was null", response.getHits()); + Assert.assertEquals("Wrong number of hits", 10, response.getHits().size()); + Assert.assertNotNull("Aggregations are missing", response.getAggregations()); + Assert.assertEquals("Aggregation count is wrong", 1, response.getAggregations().size()); + + Map termCounts = (Map) response.getAggregations().get("term_counts"); + Assert.assertNotNull("Term counts was missing", termCounts); + List> buckets = (List>) termCounts.get("buckets"); + Assert.assertNotNull("Buckets branch was empty", buckets); + Map expected = new HashMap<>(); + expected.put("one", 1); + expected.put("two", 2); + expected.put("three", 3); + expected.put("four", 4); + expected.put("five", 5); + + for (Map aggRes : buckets) { + String key = (String)aggRes.get("key"); + Integer docCount = (Integer)aggRes.get("doc_count"); + + Assert.assertEquals(String.format("%s did not match", key), expected.get(key), docCount); + } + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/TestControllerServiceProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/TestControllerServiceProcessor.java new file mode 100644 index 0000000000..674cc147b4 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/TestControllerServiceProcessor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch.integration; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.ArrayList; +import java.util.List; + +public class TestControllerServiceProcessor extends AbstractProcessor { + + static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("Client Service") + .description("ElasticSearchClientServiceImpl") + .identifiesControllerService(ElasticSearchClientServiceImpl.class) + .required(true) + .build(); + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + + @Override + protected List getSupportedPropertyDescriptors() { + List propDescs = new ArrayList<>(); + propDescs.add(CLIENT_SERVICE); + return propDescs; + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script new file mode 100644 index 0000000000..69671b6042 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup.script @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#create mapping +PUT:messages/:{ "mappings":{"message":{ "properties":{ "msg":{"type":"keyword"}}}}} +#add document +PUT:messages/message/1:{ "msg ":"one" } +PUT:messages/message/2:{ "msg ":"two" } +PUT:messages/message/3:{ "msg ":"two" } +PUT:messages/message/4:{ "msg ":"three" } +PUT:messages/message/5:{ "msg ":"three" } +PUT:messages/message/6:{ "msg ":"three" } +PUT:messages/message/7:{ "msg ":"four" } +PUT:messages/message/8:{ "msg ":"four" } +PUT:messages/message/9:{ "msg ":"four" } +PUT:messages/message/10:{ "msg ":"four" } +PUT:messages/message/11:{ "msg ":"five" } +PUT:messages/message/12:{ "msg ":"five" } +PUT:messages/message/13:{ "msg ":"five" } +PUT:messages/message/14:{ "msg ":"five" } +PUT:messages/message/15:{ "msg ":"five" } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-nar/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-nar/pom.xml new file mode 100644 index 0000000000..a1d39a344f --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-nar/pom.xml @@ -0,0 +1,43 @@ + + + + 4.0.0 + + nifi-elasticsearch-bundle + org.apache.nifi + 1.6.0-SNAPSHOT + + + org.apache.nifi + nifi-elasticsearch-restapi-nar + nar + + true + true + 6.2.1 + + + + + org.apache.nifi + nifi-standard-services-api-nar + 1.6.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-elasticsearch-restapi-processors + 1.6.0-SNAPSHOT + + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..ee59a5d3d4 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,285 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + +The binary distribution of this product bundles 'HdrHistogram' which is available under a 2-Clause BSD style license: + + Copyright (c) 2012, 2013, 2014 Gil Tene + Copyright (c) 2014 Michael Barker + Copyright (c) 2014 Matt Warren + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + THE POSSIBILITY OF SUCH DAMAGE. + +The binary distribution of this product bundles 'Bouncy Castle JDK 1.5 Provider' + under an MIT style license. + + Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org) + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. + + +The binary distribution of this product bundles 'JOpt Simple' + under an MIT style license. + Copyright (c) 2004-2015 Paul R. Holser, Jr. + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..94c9ae2717 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,429 @@ +nifi-elasticsearch-restapi-nar +Copyright 2015-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Elasticsearch + The following NOTICE information applies: + Elasticsearch + Copyright 2009-2015 Elasticsearch + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2016 The Apache Software Foundation + + (ASLv2) Apache Lucene + The following NOTICE information applies: + Apache Lucene + Copyright 2014 The Apache Software Foundation + + Includes software from other Apache Software Foundation projects, + including, but not limited to: + - Apache Ant + - Apache Jakarta Regexp + - Apache Commons + - Apache Xerces + + ICU4J, (under analysis/icu) is licensed under an MIT styles license + and Copyright (c) 1995-2008 International Business Machines Corporation and others + + Some data files (under analysis/icu/src/data) are derived from Unicode data such + as the Unicode Character Database. See http://unicode.org/copyright.html for more + details. + + Brics Automaton (under core/src/java/org/apache/lucene/util/automaton) is + BSD-licensed, created by Anders Møller. See http://www.brics.dk/automaton/ + + The levenshtein automata tables (under core/src/java/org/apache/lucene/util/automaton) were + automatically generated with the moman/finenight FSA library, created by + Jean-Philippe Barrette-LaPierre. This library is available under an MIT license, + see http://sites.google.com/site/rrettesite/moman and + http://bitbucket.org/jpbarrette/moman/overview/ + + The class org.apache.lucene.util.WeakIdentityMap was derived from + the Apache CXF project and is Apache License 2.0. + + The Google Code Prettify is Apache License 2.0. + See http://code.google.com/p/google-code-prettify/ + + JUnit (junit-4.10) is licensed under the Common Public License v. 1.0 + See http://junit.sourceforge.net/cpl-v10.html + + This product includes code (JaspellTernarySearchTrie) from Java Spelling Checkin + g Package (jaspell): http://jaspell.sourceforge.net/ + License: The BSD License (http://www.opensource.org/licenses/bsd-license.php) + + The snowball stemmers in + analysis/common/src/java/net/sf/snowball + were developed by Martin Porter and Richard Boulton. + The snowball stopword lists in + analysis/common/src/resources/org/apache/lucene/analysis/snowball + were developed by Martin Porter and Richard Boulton. + The full snowball package is available from + http://snowball.tartarus.org/ + + The KStem stemmer in + analysis/common/src/org/apache/lucene/analysis/en + was developed by Bob Krovetz and Sergio Guzman-Lara (CIIR-UMass Amherst) + under the BSD-license. + + The Arabic,Persian,Romanian,Bulgarian, and Hindi analyzers (common) come with a default + stopword list that is BSD-licensed created by Jacques Savoy. These files reside in: + analysis/common/src/resources/org/apache/lucene/analysis/ar/stopwords.txt, + analysis/common/src/resources/org/apache/lucene/analysis/fa/stopwords.txt, + analysis/common/src/resources/org/apache/lucene/analysis/ro/stopwords.txt, + analysis/common/src/resources/org/apache/lucene/analysis/bg/stopwords.txt, + analysis/common/src/resources/org/apache/lucene/analysis/hi/stopwords.txt + See http://members.unine.ch/jacques.savoy/clef/index.html. + + The German,Spanish,Finnish,French,Hungarian,Italian,Portuguese,Russian and Swedish light stemmers + (common) are based on BSD-licensed reference implementations created by Jacques Savoy and + Ljiljana Dolamic. These files reside in: + analysis/common/src/java/org/apache/lucene/analysis/de/GermanLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/de/GermanMinimalStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/es/SpanishLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/fi/FinnishLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/fr/FrenchLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/fr/FrenchMinimalStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/hu/HungarianLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/it/ItalianLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/pt/PortugueseLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/ru/RussianLightStemmer.java + analysis/common/src/java/org/apache/lucene/analysis/sv/SwedishLightStemmer.java + + The Stempel analyzer (stempel) includes BSD-licensed software developed + by the Egothor project http://egothor.sf.net/, created by Leo Galambos, Martin Kvapil, + and Edmond Nolan. + + The Polish analyzer (stempel) comes with a default + stopword list that is BSD-licensed created by the Carrot2 project. The file resides + in stempel/src/resources/org/apache/lucene/analysis/pl/stopwords.txt. + See http://project.carrot2.org/license.html. + + The SmartChineseAnalyzer source code (smartcn) was + provided by Xiaoping Gao and copyright 2009 by www.imdict.net. + + WordBreakTestUnicode_*.java (under modules/analysis/common/src/test/) + is derived from Unicode data such as the Unicode Character Database. + See http://unicode.org/copyright.html for more details. + + The Morfologik analyzer (morfologik) includes BSD-licensed software + developed by Dawid Weiss and Marcin Miłkowski (http://morfologik.blogspot.com/). + + Morfologik uses data from Polish ispell/myspell dictionary + (http://www.sjp.pl/slownik/en/) licenced on the terms of (inter alia) + LGPL and Creative Commons ShareAlike. + + Morfologic includes data from BSD-licensed dictionary of Polish (SGJP) + (http://sgjp.pl/morfeusz/) + + Servlet-api.jar and javax.servlet-*.jar are under the CDDL license, the original + source code for this can be found at http://www.eclipse.org/jetty/downloads.php + + =========================================================================== + Kuromoji Japanese Morphological Analyzer - Apache Lucene Integration + =========================================================================== + + This software includes a binary and/or source version of data from + + mecab-ipadic-2.7.0-20070801 + + which can be obtained from + + http://atilika.com/releases/mecab-ipadic/mecab-ipadic-2.7.0-20070801.tar.gz + + or + + http://jaist.dl.sourceforge.net/project/mecab/mecab-ipadic/2.7.0-20070801/mecab-ipadic-2.7.0-20070801.tar.gz + + =========================================================================== + mecab-ipadic-2.7.0-20070801 Notice + =========================================================================== + + Nara Institute of Science and Technology (NAIST), + the copyright holders, disclaims all warranties with regard to this + software, including all implied warranties of merchantability and + fitness, in no event shall NAIST be liable for + any special, indirect or consequential damages or any damages + whatsoever resulting from loss of use, data or profits, whether in an + action of contract, negligence or other tortuous action, arising out + of or in connection with the use or performance of this software. + + A large portion of the dictionary entries + originate from ICOT Free Software. The following conditions for ICOT + Free Software applies to the current dictionary as well. + + Each User may also freely distribute the Program, whether in its + original form or modified, to any third party or parties, PROVIDED + that the provisions of Section 3 ("NO WARRANTY") will ALWAYS appear + on, or be attached to, the Program, which is distributed substantially + in the same form as set out herein and that such intended + distribution, if actually made, will neither violate or otherwise + contravene any of the laws and regulations of the countries having + jurisdiction over the User or the intended distribution itself. + + NO WARRANTY + + The program was produced on an experimental basis in the course of the + research and development conducted during the project and is provided + to users as so produced on an experimental basis. Accordingly, the + program is provided without any warranty whatsoever, whether express, + implied, statutory or otherwise. The term "warranty" used herein + includes, but is not limited to, any warranty of the quality, + performance, merchantability and fitness for a particular purpose of + the program and the nonexistence of any infringement or violation of + any right of any third party. + + Each user of the program will agree and understand, and be deemed to + have agreed and understood, that there is no warranty whatsoever for + the program and, accordingly, the entire risk arising from or + otherwise connected with the program is assumed by the user. + + Therefore, neither ICOT, the copyright holder, or any other + organization that participated in or was otherwise related to the + development of the program and their respective officials, directors, + officers and other employees shall be held liable for any and all + damages, including, without limitation, general, special, incidental + and consequential damages, arising out of or otherwise in connection + with the use or inability to use the program or any product, material + or result produced or otherwise obtained by using the program, + regardless of whether they have been advised of, or otherwise had + knowledge of, the possibility of such damages at any time during the + project or thereafter. Each user will be deemed to have agreed to the + foregoing by his or her commencement of use of the program. The term + "use" as used herein includes, but is not limited to, the use, + modification, copying and distribution of the program and the + production of secondary products from the program. + + In the case where the program, whether in its original form or + modified, was distributed or delivered to or received by a user from + any person, organization or entity other than ICOT, unless it makes or + grants independently of ICOT any specific warranty to the user in + writing, such person, organization or entity, will also be exempted + from and not be held liable to the user for any such damages as noted + above as far as the program is concerned. + + (ASLv2) Carrotsearch HPPC + The following NOTICE information applies: + HPPC borrowed code, ideas or both from: + + * Apache Lucene, http://lucene.apache.org/ + (Apache license) + * Fastutil, http://fastutil.di.unimi.it/ + (Apache license) + * Koloboke, https://github.com/OpenHFT/Koloboke + (Apache license) + + (ASLv2) Joda Time + The following NOTICE information applies: + This product includes software developed by + Joda.org (http://www.joda.org/). + + (ASLv2) The Netty Project + The following NOTICE information applies: + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. + + The Netty Project + ================= + + Please visit the Netty web site for more information: + + * http://netty.io/ + + Copyright 2011 The Netty Project + + The Netty Project licenses this file to you under the Apache License, + version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at: + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + + Also, please refer to each LICENSE..txt file, which is located in + the 'license' directory of the distribution file, for the license terms of the + components that this product depends on. + + ------------------------------------------------------------------------------- + This product contains the extensions to Java Collections Framework which has + been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene: + + * LICENSE: + * license/LICENSE.jsr166y.txt (Public Domain) + * HOMEPAGE: + * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/ + * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/ + + This product contains a modified version of Robert Harder's Public Domain + Base64 Encoder and Decoder, which can be obtained at: + + * LICENSE: + * license/LICENSE.base64.txt (Public Domain) + * HOMEPAGE: + * http://iharder.sourceforge.net/current/java/base64/ + + This product contains a modified version of 'JZlib', a re-implementation of + zlib in pure Java, which can be obtained at: + + * LICENSE: + * license/LICENSE.jzlib.txt (BSD Style License) + * HOMEPAGE: + * http://www.jcraft.com/jzlib/ + + This product contains a modified version of 'Webbit', a Java event based + WebSocket and HTTP server: + + * LICENSE: + * license/LICENSE.webbit.txt (BSD License) + * HOMEPAGE: + * https://github.com/joewalnes/webbit + + This product optionally depends on 'Protocol Buffers', Google's data + interchange format, which can be obtained at: + + * LICENSE: + * license/LICENSE.protobuf.txt (New BSD License) + * HOMEPAGE: + * http://code.google.com/p/protobuf/ + + This product optionally depends on 'Bouncy Castle Crypto APIs' to generate + a temporary self-signed X.509 certificate when the JVM does not provide the + equivalent functionality. It can be obtained at: + + * LICENSE: + * license/LICENSE.bouncycastle.txt (MIT License) + * HOMEPAGE: + * http://www.bouncycastle.org/ + + This product optionally depends on 'SLF4J', a simple logging facade for Java, + which can be obtained at: + + * LICENSE: + * license/LICENSE.slf4j.txt (MIT License) + * HOMEPAGE: + * http://www.slf4j.org/ + + This product optionally depends on 'Apache Commons Logging', a logging + framework, which can be obtained at: + + * LICENSE: + * license/LICENSE.commons-logging.txt (Apache License 2.0) + * HOMEPAGE: + * http://commons.apache.org/logging/ + + This product optionally depends on 'Apache Log4J', a logging framework, + which can be obtained at: + + * LICENSE: + * license/LICENSE.log4j.txt (Apache License 2.0) + * HOMEPAGE: + * http://logging.apache.org/log4j/ + + This product optionally depends on 'JBoss Logging', a logging framework, + which can be obtained at: + + * LICENSE: + * license/LICENSE.jboss-logging.txt (GNU LGPL 2.1) + * HOMEPAGE: + * http://anonsvn.jboss.org/repos/common/common-logging-spi/ + + This product optionally depends on 'Apache Felix', an open source OSGi + framework implementation, which can be obtained at: + + * LICENSE: + * license/LICENSE.felix.txt (Apache License 2.0) + * HOMEPAGE: + * http://felix.apache.org/ + + (ASLv2) t-digest + The following NOTICE information applies: + The code for the t-digest was originally authored by Ted Dunning + A number of small but very helpful changes have been contributed by Adrien Grand (https://github.com/jpountz) + + (ASLv2) Apache Commons Logging + The following NOTICE information applies: + + Apache Commons Logging + Copyright 2003-2016 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + + (ASLv2) Apache Commons Codec + The following NOTICE information applies: + Apache Commons Codec + Copyright 2002-2016 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java + contains test data from http://aspell.net/test/orig/batch0.tab. + Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org) + + =============================================================================== + + The content of package org.apache.commons.codec.language.bm has been translated + from the original php source code available at http://stevemorse.org/phoneticinfo.htm + with permission from the original authors. + Original source copyright: + Copyright (c) 2008 Alexander Beider & Stephen P. Morse. + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache HttpComponents + The following NOTICE information applies: + Apache HttpComponents Client + Copyright 1999-2016 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + (ASLv2) Apache Log4J + The following NOTICE information applies: + Apache log4j + Copyright 2010 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml new file mode 100644 index 0000000000..7ce08f8a20 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml @@ -0,0 +1,116 @@ + + + + 4.0.0 + + nifi-elasticsearch-bundle + org.apache.nifi + 1.6.0-SNAPSHOT + + + nifi-elasticsearch-restapi-processors + jar + + + 2.7 + 5.6.6 + 6.2.1 + + + + + org.apache.nifi + nifi-api + 1.6.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-properties + 1.6.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-processor-utils + 1.6.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-mock + 1.6.0-SNAPSHOT + test + + + org.elasticsearch.client + transport + ${es.version} + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + ${es.version} + + + org.apache.nifi + nifi-ssl-context-service-api + 1.6.0-SNAPSHOT + + + commons-io + commons-io + 2.6 + + + org.apache.logging.log4j + log4j-api + 2.8.2 + + + org.apache.logging.log4j + log4j-core + 2.8.2 + + + org.apache.nifi + nifi-ssl-context-service + 1.6.0-SNAPSHOT + test + + + com.fasterxml.jackson.core + jackson-databind + 2.9.4 + + + org.apache.nifi + nifi-elasticsearch-client-service-api + 1.6.0-SNAPSHOT + provided + + + + + + + org.apache.rat + apache-rat-plugin + + + src/test/resources/*.json + + + + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java new file mode 100644 index 0000000000..434f185156 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.SearchResponse; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@WritesAttributes({ + @WritesAttribute(attribute = "mime.type", description = "application/json"), + @WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"}) +@CapabilityDescription("A processor that allows the user to run a query (with aggregations) written with the " + + "ElasticSearch JSON DSL. It does not automatically paginate queries for the user. If an incoming relationship is added to this " + + "processor, it will use the flowfile's content for the query. Care should be taken on the size of the query because the entire response " + + "from ElasticSearch will be loaded into memory all at once and converted into the resulting flowfiles.") +public class JsonQueryElasticsearch extends AbstractProcessor { + public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original") + .description("All original flowfiles that don't cause an error to occur go to this relationship. " + + "This applies even if you select the \"split up hits\" option to send individual hits to the " + + "\"hits\" relationship.").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build(); + + public static final Relationship REL_HITS = new Relationship.Builder().name("hits") + .description("Search hits are routed to this relationship.") + .build(); + + public static final Relationship REL_AGGREGATIONS = new Relationship.Builder().name("aggregations") + .description("Aggregations are routed to this relationship.") + .build(); + + public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("el-rest-fetch-index") + .displayName("Index") + .description("The name of the index to read from") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() + .name("el-rest-type") + .displayName("Type") + .description("The type of this document (used by Elasticsearch for indexing and searching)") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("el-rest-query") + .displayName("Query") + .description("A query in JSON syntax, not Lucene syntax. Ex: " + + "{\n" + + "\t\"query\": {\n" + + "\t\t\"match\": {\n" + + "\t\t\t\"name\": \"John Smith\"\n" + + "\t\t}\n" + + "\t}\n" + + "}") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("el-query-attribute") + .displayName("Query Attribute") + .description("If set, the executed query will be set on each result flowfile in the specified attribute.") + .expressionLanguageSupported(true) + .addValidator(Validator.VALID) + .required(false) + .build(); + + public static final AllowableValue SPLIT_UP_YES = new AllowableValue( + "splitUp-yes", + "Yes", + "Split up results." + ); + public static final AllowableValue SPLIT_UP_HITS_NO = new AllowableValue( + "splitUp-no", + "No", + "Don't split up results." + ); + + public static final PropertyDescriptor SPLIT_UP_HITS = new PropertyDescriptor.Builder() + .name("el-rest-split-up-hits") + .displayName("Split up search results") + .description("Split up search results into one flowfile per result.") + .allowableValues(SPLIT_UP_HITS_NO, SPLIT_UP_YES) + .defaultValue(SPLIT_UP_HITS_NO.getValue()) + .required(true) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor SPLIT_UP_AGGREGATIONS = new PropertyDescriptor.Builder() + .name("el-rest-split-up-aggregations") + .displayName("Split up aggregation results") + .description("Split up aggregation results into one flowfile per result.") + .allowableValues(SPLIT_UP_HITS_NO, SPLIT_UP_YES) + .defaultValue(SPLIT_UP_HITS_NO.getValue()) + .required(true) + .expressionLanguageSupported(false) + .build(); + + public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("el-rest-client-service") + .displayName("Client Service") + .description("An ElasticSearch client service to use for running queries.") + .identifiesControllerService(ElasticSearchClientService.class) + .required(true) + .build(); + + private static final Set relationships; + private static final List propertyDescriptors; + + private volatile ElasticSearchClientService clientService; + + static { + final Set _rels = new HashSet<>(); + _rels.add(REL_ORIGINAL); + _rels.add(REL_FAILURE); + _rels.add(REL_HITS); + _rels.add(REL_AGGREGATIONS); + relationships = Collections.unmodifiableSet(_rels); + + final List descriptors = new ArrayList<>(); + descriptors.add(QUERY); + descriptors.add(QUERY_ATTRIBUTE); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(CLIENT_SERVICE); + descriptors.add(SPLIT_UP_HITS); + descriptors.add(SPLIT_UP_AGGREGATIONS); + + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class); + } + + @OnStopped + public void onStopped() { + this.clientService = null; + } + + + private final ObjectMapper mapper = new ObjectMapper(); + + private String getQuery(FlowFile input, ProcessContext context, ProcessSession session) throws IOException { + String retVal = null; + if (context.getProperty(QUERY).isSet()) { + retVal = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue(); + } else if (input != null) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + session.exportTo(input, out); + out.close(); + + retVal = new String(out.toByteArray()); + } + + return retVal; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile input = null; + if (context.hasIncomingConnection()) { + input = session.get(); + + if (input == null && context.hasNonLoopConnection()) { + return; + } + } + + try { + + final String query = getQuery(input, context, session); + final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue(); + final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue(); + final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet() + ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue() + : null; + + SearchResponse response = clientService.search(query, index, type); + + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + if (!StringUtils.isBlank(queryAttr)) { + attributes.put(queryAttr, query); + } + + List hitsFlowFiles = handleHits(response.getHits(), context, session, input, attributes); + List aggsFlowFiles = handleAggregations(response.getAggregations(), context, session, input, attributes); + + final String transitUri = clientService.getTransitUrl(index, type); + + if (hitsFlowFiles.size() > 0) { + session.transfer(hitsFlowFiles, REL_HITS); + for (FlowFile ff : hitsFlowFiles) { + session.getProvenanceReporter().send(ff, transitUri); + } + } + + if (aggsFlowFiles.size() > 0) { + session.transfer(aggsFlowFiles, REL_AGGREGATIONS); + for (FlowFile ff : aggsFlowFiles) { + session.getProvenanceReporter().send(ff, transitUri); + } + } + + if (input != null) { + session.transfer(input, REL_ORIGINAL); + } + } catch (Exception ex) { + getLogger().error("Error processing flowfile.", ex); + if (input != null) { + session.transfer(input, REL_FAILURE); + } + context.yield(); + } + } + + private FlowFile writeAggregationFlowFileContents(String name, String json, ProcessSession session, FlowFile aggFlowFile, Map attributes) { + aggFlowFile = session.write(aggFlowFile, out -> out.write(json.getBytes())); + if (name != null) { + aggFlowFile = session.putAttribute(aggFlowFile, "aggregation.name", name); + } + + return session.putAllAttributes(aggFlowFile, attributes); + } + + private List handleAggregations(Map aggregations, ProcessContext context, ProcessSession session, FlowFile parent, Map attributes) throws IOException { + List retVal = new ArrayList<>(); + if (aggregations == null) { + return retVal; + } + String splitUpValue = context.getProperty(SPLIT_UP_AGGREGATIONS).getValue(); + + if (splitUpValue.equals(SPLIT_UP_YES.getValue())) { + for (Map.Entry agg : aggregations.entrySet()) { + FlowFile aggFlowFile = parent != null ? session.create(parent) : session.create(); + String aggJson = mapper.writeValueAsString(agg.getValue()); + retVal.add(writeAggregationFlowFileContents(agg.getKey(), aggJson, session, aggFlowFile, attributes)); + } + } else { + String json = mapper.writeValueAsString(aggregations); + retVal.add(writeAggregationFlowFileContents(null, json, session, parent != null ? session.create(parent) : session.create(), attributes)); + } + + return retVal; + } + + private FlowFile writeHitFlowFile(String json, ProcessSession session, FlowFile hitFlowFile, Map attributes) { + hitFlowFile = session.write(hitFlowFile, out -> out.write(json.getBytes())); + + return session.putAllAttributes(hitFlowFile, attributes); + } + + private List handleHits(List> hits, ProcessContext context, ProcessSession session, FlowFile parent, Map attributes) throws IOException { + String splitUpValue = context.getProperty(SPLIT_UP_HITS).getValue(); + List retVal = new ArrayList<>(); + if (splitUpValue.equals(SPLIT_UP_YES.getValue())) { + for (Map hit : hits) { + FlowFile hitFlowFile = parent != null ? session.create(parent) : session.create(); + String json = mapper.writeValueAsString(hit); + + retVal.add(writeHitFlowFile(json, session, hitFlowFile, attributes)); + } + } else { + FlowFile hitFlowFile = parent != null ? session.create(parent) : session.create(); + String json = mapper.writeValueAsString(hits); + retVal.add(writeHitFlowFile(json, session, hitFlowFile, attributes)); + } + + return retVal; + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..d57ff36853 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.java new file mode 100644 index 0000000000..cd7ac196e1 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearchTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class JsonQueryElasticsearchTest { + private static final String INDEX_NAME = "messages"; + + public void testCounts(TestRunner runner, int success, int hits, int failure, int aggregations) { + runner.assertTransferCount(JsonQueryElasticsearch.REL_ORIGINAL, success); + runner.assertTransferCount(JsonQueryElasticsearch.REL_HITS, hits); + runner.assertTransferCount(JsonQueryElasticsearch.REL_FAILURE, failure); + runner.assertTransferCount(JsonQueryElasticsearch.REL_AGGREGATIONS, aggregations); + } + + @Test + public void testBasicQuery() throws Exception { + + JsonQueryElasticsearch processor = new JsonQueryElasticsearch(); + TestRunner runner = TestRunners.newTestRunner(processor); + TestElasticSearchClientService service = new TestElasticSearchClientService(false); + runner.addControllerService("esService", service); + runner.enableControllerService(service); + runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService"); + runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME); + runner.setProperty(JsonQueryElasticsearch.TYPE, "message"); + runner.setValidateExpressionUsage(true); + runner.setProperty(JsonQueryElasticsearch.QUERY, "{ \"query\": { \"match_all\": {} }}"); + + runner.enqueue("test"); + runner.run(1, true, true); + testCounts(runner, 1, 1, 0, 0); + + runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_HITS, JsonQueryElasticsearch.SPLIT_UP_YES); + runner.clearProvenanceEvents(); + runner.clearTransferState(); + runner.enqueue("test"); + runner.run(1, true, true); + testCounts(runner, 1, 10, 0, 0); + } + + @Test + public void testAggregations() throws Exception { + String query = "{\n" + + "\t\"query\": {\n" + + "\t\t\"match_all\": {}\n" + + "\t},\n" + + "\t\"aggs\": {\n" + + "\t\t\"test_agg\": {\n" + + "\t\t\t\"terms\": {\n" + + "\t\t\t\t\"field\": \"msg\"\n" + + "\t\t\t}\n" + + "\t\t},\n" + + "\t\t\"test_agg2\": {\n" + + "\t\t\t\"terms\": {\n" + + "\t\t\t\t\"field\": \"msg\"\n" + + "\t\t\t}\n" + + "\t\t}\n" + + "\t}\n" + + "}"; + + + JsonQueryElasticsearch processor = new JsonQueryElasticsearch(); + TestRunner runner = TestRunners.newTestRunner(processor); + TestElasticSearchClientService service = new TestElasticSearchClientService(true); + runner.addControllerService("esService", service); + runner.enableControllerService(service); + runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService"); + runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME); + runner.setProperty(JsonQueryElasticsearch.TYPE, "message"); + runner.setValidateExpressionUsage(true); + runner.setProperty(JsonQueryElasticsearch.QUERY, query); + + runner.enqueue("test"); + runner.run(1, true, true); + testCounts(runner, 1, 1, 0, 1); + + runner.clearTransferState(); + + //Test with the query parameter and no incoming connection + runner.setIncomingConnection(false); + runner.run(1, true, true); + testCounts(runner, 0, 1, 0, 1); + runner.setIncomingConnection(true); + + + runner.clearTransferState(); + runner.clearProvenanceEvents(); + runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_AGGREGATIONS, JsonQueryElasticsearch.SPLIT_UP_YES); + runner.enqueue("test"); + runner.run(1, true, true); + testCounts(runner, 1, 1, 0, 2); + + runner.clearProvenanceEvents(); + runner.clearTransferState(); + + query = "{\n" + + "\t\"query\": {\n" + + "\t\t\"match_all\": {}\n" + + "\t},\n" + + "\t\"aggs\": {\n" + + "\t\t\"test_agg\": {\n" + + "\t\t\t\"terms\": {\n" + + "\t\t\t\t\"field\": \"${fieldValue}\"\n" + + "\t\t\t}\n" + + "\t\t},\n" + + "\t\t\"test_agg2\": {\n" + + "\t\t\t\"terms\": {\n" + + "\t\t\t\t\"field\": \"${fieldValue}\"\n" + + "\t\t\t}\n" + + "\t\t}\n" + + "\t}\n" + + "}"; + runner.setVariable("fieldValue", "msg"); + runner.setVariable("es.index", INDEX_NAME); + runner.setVariable("es.type", "msg"); + runner.setProperty(JsonQueryElasticsearch.QUERY, query); + runner.setProperty(JsonQueryElasticsearch.INDEX, "${es.index}"); + runner.setProperty(JsonQueryElasticsearch.TYPE, "${es.type}"); + runner.setValidateExpressionUsage(true); + runner.enqueue("test"); + runner.run(1, true, true); + testCounts(runner, 1, 1, 0, 2); + } + + @Test + public void testErrorDuringSearch() throws Exception { + String query = "{\n" + + "\t\"query\": {\n" + + "\t\t\"match_all\": {}\n" + + "\t},\n" + + "\t\"aggs\": {\n" + + "\t\t\"test_agg\": {\n" + + "\t\t\t\"terms\": {\n" + + "\t\t\t\t\"field\": \"msg\"\n" + + "\t\t\t}\n" + + "\t\t},\n" + + "\t\t\"test_agg2\": {\n" + + "\t\t\t\"terms\": {\n" + + "\t\t\t\t\"field\": \"msg\"\n" + + "\t\t\t}\n" + + "\t\t}\n" + + "\t}\n" + + "}"; + + + JsonQueryElasticsearch processor = new JsonQueryElasticsearch(); + TestRunner runner = TestRunners.newTestRunner(processor); + TestElasticSearchClientService service = new TestElasticSearchClientService(true); + service.setThrowErrorInSearch(true); + runner.addControllerService("esService", service); + runner.enableControllerService(service); + runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService"); + runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME); + runner.setProperty(JsonQueryElasticsearch.TYPE, "message"); + runner.setValidateExpressionUsage(true); + runner.setProperty(JsonQueryElasticsearch.QUERY, query); + + runner.enqueue("test"); + runner.run(1, true, true); + testCounts(runner, 0, 0, 1, 0); + } + + @Test + public void testQueryAttribute() throws Exception { + final String query = "{\n" + + "\t\"query\": {\n" + + "\t\t\"match_all\": {}\n" + + "\t},\n" + + "\t\"aggs\": {\n" + + "\t\t\"test_agg\": {\n" + + "\t\t\t\"terms\": {\n" + + "\t\t\t\t\"field\": \"msg\"\n" + + "\t\t\t}\n" + + "\t\t},\n" + + "\t\t\"test_agg2\": {\n" + + "\t\t\t\"terms\": {\n" + + "\t\t\t\t\"field\": \"msg\"\n" + + "\t\t\t}\n" + + "\t\t}\n" + + "\t}\n" + + "}"; + final String queryAttr = "es.query"; + + + JsonQueryElasticsearch processor = new JsonQueryElasticsearch(); + TestRunner runner = TestRunners.newTestRunner(processor); + TestElasticSearchClientService service = new TestElasticSearchClientService(true); + runner.addControllerService("esService", service); + runner.enableControllerService(service); + runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService"); + runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME); + runner.setProperty(JsonQueryElasticsearch.TYPE, "message"); + runner.setValidateExpressionUsage(true); + runner.setProperty(JsonQueryElasticsearch.QUERY, query); + runner.setProperty(JsonQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr); + + runner.enqueue("test"); + runner.run(1, true, true); + testCounts(runner, 1, 1, 0, 1); + List flowFiles = runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_AGGREGATIONS); + flowFiles.addAll(runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_HITS)); + + for (MockFlowFile mockFlowFile : flowFiles) { + String attr = mockFlowFile.getAttribute(queryAttr); + Assert.assertNotNull("Missing query attribute", attr); + Assert.assertEquals("Query had wrong value.", query, attr); + } + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java new file mode 100644 index 0000000000..445a0931d3 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestElasticSearchClientService.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.SearchResponse; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService { + private boolean returnAggs; + private boolean throwErrorInSearch; + + public TestElasticSearchClientService(boolean returnAggs) { + this.returnAggs = returnAggs; + } + + @Override + public SearchResponse search(String query, String index, String type) throws IOException { + if (throwErrorInSearch) { + throw new IOException(); + } + + ObjectMapper mapper = new ObjectMapper(); + List> hits = (List>)mapper.readValue(HITS_RESULT, List.class); + Map aggs = returnAggs ? (Map)mapper.readValue(AGGS_RESULT, Map.class) : null; + SearchResponse response = new SearchResponse(hits, aggs, 15, 5, false); + return response; + } + + @Override + public String getTransitUrl(String index, String type) { + return String.format("http://localhost:9400/%s/%s", index, type); + } + + private static final String AGGS_RESULT = "{\n" + + " \"term_agg2\": {\n" + + " \"doc_count_error_upper_bound\": 0,\n" + + " \"sum_other_doc_count\": 0,\n" + + " \"buckets\": [\n" + + " {\n" + + " \"key\": \"five\",\n" + + " \"doc_count\": 5\n" + + " },\n" + + " {\n" + + " \"key\": \"four\",\n" + + " \"doc_count\": 4\n" + + " },\n" + + " {\n" + + " \"key\": \"three\",\n" + + " \"doc_count\": 3\n" + + " },\n" + + " {\n" + + " \"key\": \"two\",\n" + + " \"doc_count\": 2\n" + + " },\n" + + " {\n" + + " \"key\": \"one\",\n" + + " \"doc_count\": 1\n" + + " }\n" + + " ]\n" + + " },\n" + + " \"term_agg\": {\n" + + " \"doc_count_error_upper_bound\": 0,\n" + + " \"sum_other_doc_count\": 0,\n" + + " \"buckets\": [\n" + + " {\n" + + " \"key\": \"five\",\n" + + " \"doc_count\": 5\n" + + " },\n" + + " {\n" + + " \"key\": \"four\",\n" + + " \"doc_count\": 4\n" + + " },\n" + + " {\n" + + " \"key\": \"three\",\n" + + " \"doc_count\": 3\n" + + " },\n" + + " {\n" + + " \"key\": \"two\",\n" + + " \"doc_count\": 2\n" + + " },\n" + + " {\n" + + " \"key\": \"one\",\n" + + " \"doc_count\": 1\n" + + " }\n" + + " ]\n" + + " }\n" + + " }"; + + private static final String HITS_RESULT = "[\n" + + " {\n" + + " \"_index\": \"messages\",\n" + + " \"_type\": \"message\",\n" + + " \"_id\": \"14\",\n" + + " \"_score\": 1,\n" + + " \"_source\": {\n" + + " \"msg\": \"five\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"_index\": \"messages\",\n" + + " \"_type\": \"message\",\n" + + " \"_id\": \"5\",\n" + + " \"_score\": 1,\n" + + " \"_source\": {\n" + + " \"msg\": \"three\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"_index\": \"messages\",\n" + + " \"_type\": \"message\",\n" + + " \"_id\": \"8\",\n" + + " \"_score\": 1,\n" + + " \"_source\": {\n" + + " \"msg\": \"four\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"_index\": \"messages\",\n" + + " \"_type\": \"message\",\n" + + " \"_id\": \"9\",\n" + + " \"_score\": 1,\n" + + " \"_source\": {\n" + + " \"msg\": \"four\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"_index\": \"messages\",\n" + + " \"_type\": \"message\",\n" + + " \"_id\": \"10\",\n" + + " \"_score\": 1,\n" + + " \"_source\": {\n" + + " \"msg\": \"four\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"_index\": \"messages\",\n" + + " \"_type\": \"message\",\n" + + " \"_id\": \"12\",\n" + + " \"_score\": 1,\n" + + " \"_source\": {\n" + + " \"msg\": \"five\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"_index\": \"messages\",\n" + + " \"_type\": \"message\",\n" + + " \"_id\": \"2\",\n" + + " \"_score\": 1,\n" + + " \"_source\": {\n" + + " \"msg\": \"two\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"_index\": \"messages\",\n" + + " \"_type\": \"message\",\n" + + " \"_id\": \"4\",\n" + + " \"_score\": 1,\n" + + " \"_source\": {\n" + + " \"msg\": \"three\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"_index\": \"messages\",\n" + + " \"_type\": \"message\",\n" + + " \"_id\": \"6\",\n" + + " \"_score\": 1,\n" + + " \"_source\": {\n" + + " \"msg\": \"three\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"_index\": \"messages\",\n" + + " \"_type\": \"message\",\n" + + " \"_id\": \"15\",\n" + + " \"_score\": 1,\n" + + " \"_source\": {\n" + + " \"msg\": \"five\"\n" + + " }\n" + + " }\n" + + " ]"; + + public void setThrowErrorInSearch(boolean throwErrorInSearch) { + this.throwErrorInSearch = throwErrorInSearch; + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/DocumentExample.json b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/DocumentExample.json new file mode 100644 index 0000000000..66449cf1e1 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/DocumentExample.json @@ -0,0 +1,21 @@ +{ + "created_at": "Thu Jan 21 16:02:46 +0000 2016", + "text": "This is a test document from a mock social media service", + "contributors": null, + "id": 28039652140, + "shares": null, + "geographic_location": null, + "userinfo": { + "name": "Not A. Person", + "location": "Orlando, FL", + "created_at": "Fri Oct 24 23:22:09 +0000 2008", + "follow_count": 1, + "url": "http://not.a.real.site", + "id": 16958875, + "lang": "en", + "time_zone": "Mountain Time (US & Canada)", + "description": "I'm a test person.", + "following_count": 71, + "screen_name": "Nobody" + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/log4j.properties b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/log4j.properties new file mode 100644 index 0000000000..cc58727fa0 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=INFO, CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender + +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x \u2013 %m%n + +log4j.logger.org.apache.flume = DEBUG \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml index 7802637fb6..f7073516fb 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml @@ -23,10 +23,14 @@ language governing permissions and limitations under the License. --> pom + nifi-elasticsearch-client-service + nifi-elasticsearch-client-service-nar nifi-elasticsearch-nar nifi-elasticsearch-processors nifi-elasticsearch-5-nar nifi-elasticsearch-5-processors + nifi-elasticsearch-restapi-nar + nifi-elasticsearch-restapi-processors @@ -41,7 +45,12 @@ language governing permissions and limitations under the License. --> nifi-elasticsearch-5-processors 1.6.0-SNAPSHOT + + org.apache.nifi + nifi-elasticsearch-restapi-processors + 1.6.0-SNAPSHOT + - \ No newline at end of file + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-elasticsearch-client-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-elasticsearch-client-service-api/pom.xml new file mode 100644 index 0000000000..3dce698c71 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-elasticsearch-client-service-api/pom.xml @@ -0,0 +1,48 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-standard-services + 1.6.0-SNAPSHOT + + + nifi-elasticsearch-client-service-api + jar + + + + org.apache.nifi + nifi-api + 1.6.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-utils + 1.6.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-ssl-context-service-api + 1.6.0-SNAPSHOT + provided + + + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java new file mode 100644 index 0000000000..1d44bc8148 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.elasticsearch; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; + +import java.io.IOException; + +@Tags({"elasticsearch", "client"}) +@CapabilityDescription("A controller service for accessing an ElasticSearch client.") +public interface ElasticSearchClientService extends ControllerService { + PropertyDescriptor HTTP_HOSTS = new PropertyDescriptor.Builder() + .name("el-cs-http-hosts") + .displayName("HTTP Hosts") + .description("A comma-separated list of HTTP hosts that host ElasticSearch query nodes.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("el-cs-ssl-context-service") + .displayName("SSL Context Service") + .description("The SSL Context Service used to provide client certificate information for TLS/SSL " + + "connections. This service only applies if the Elasticsearch endpoint(s) have been secured with TLS/SSL.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .name("el-cs-username") + .displayName("Username") + .description("The username to use with XPack security.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("el-cs-password") + .displayName("Password") + .description("The password to use with XPack security.") + .required(false) + .sensitive(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder() + .name("el-cs-connect-timeout") + .displayName("Connect timeout") + .description("Controls the amount of time, in milliseconds, before a timeout occurs when trying to connect.") + .required(true) + .defaultValue("5000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + PropertyDescriptor SOCKET_TIMEOUT = new PropertyDescriptor.Builder() + .name("el-cs-socket-timeout") + .displayName("Read timeout") + .description("Controls the amount of time, in milliseconds, before a timeout occurs when waiting for a response.") + .required(true) + .defaultValue("60000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + PropertyDescriptor RETRY_TIMEOUT = new PropertyDescriptor.Builder() + .name("el-cs-retry-timeout") + .displayName("Retry timeout") + .description("Controls the amount of time, in milliseconds, before a timeout occurs when retrying the operation.") + .required(true) + .defaultValue("60000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("el-cs-charset") + .displayName("Charset") + .description("The charset to use for interpreting the response from ElasticSearch.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + /** + * Perform a search using the JSON DSL. + * + * @param query A JSON string reprensenting the query. + * @param index The index to target. Optional. + * @param type The type to target. Optional. Will not be used in future versions of ElasticSearch. + * @return A SearchResponse object if successful. + */ + SearchResponse search(String query, String index, String type) throws IOException; + + /** + * Build a transit URL to use with the provenance reporter. + * @param index Index targeted. Optional. + * @param type Type targeted. Optional + * @return a URL describing the ElasticSearch cluster. + */ + String getTransitUrl(String index, String type); +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/SearchResponse.java b/nifi-nar-bundles/nifi-standard-services/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/SearchResponse.java new file mode 100644 index 0000000000..725ad417fa --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/SearchResponse.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +import java.util.List; +import java.util.Map; + +public class SearchResponse { + private List> hits; + private Map aggregations; + private long numberOfHits; + private int took; + private boolean timedOut; + + public SearchResponse(List> hits, Map aggregations, + int numberOfHits, int took, boolean timedOut) { + this.hits = hits; + this.aggregations = aggregations; + this.numberOfHits = numberOfHits; + this.took = took; + this.timedOut = timedOut; + } + + public Map getAggregations() { + return aggregations; + } + + public List> getHits() { + return hits; + } + + public long getNumberOfHits() { + return numberOfHits; + } + + public boolean isTimedOut() { + return timedOut; + } + + public int getTook() { + return took; + } + + @Override + public String toString() { + return "SearchResponse{" + + "hits=" + hits + + ", aggregations=" + aggregations + + ", numberOfHits=" + numberOfHits + + '}'; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml index ab5714de49..49004b0e87 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml @@ -36,6 +36,12 @@ nifi-distributed-cache-client-service-api compile + + org.apache.nifi + nifi-elasticsearch-client-service-api + 1.6.0-SNAPSHOT + compile + org.apache.nifi nifi-load-distribution-service-api diff --git a/nifi-nar-bundles/nifi-standard-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/pom.xml index fb6c4d5238..340ca66ce6 100644 --- a/nifi-nar-bundles/nifi-standard-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/pom.xml @@ -25,6 +25,7 @@ nifi-distributed-cache-client-service-api nifi-distributed-cache-services-bundle + nifi-elasticsearch-client-service-api nifi-load-distribution-service-api nifi-http-context-map-api nifi-lookup-service-api diff --git a/pom.xml b/pom.xml index 708810013e..1aa529f285 100644 --- a/pom.xml +++ b/pom.xml @@ -139,7 +139,7 @@ that live in the top-level lib directory and will be present in the parent-first classloading of all child nars. Therefore we dont want child nars using different versions anyway.--> - + javax.servlet @@ -242,7 +242,7 @@ 1.3 test - + org.eclipse.jetty @@ -663,18 +663,18 @@ org.apache.maven.plugins maven-checkstyle-plugin - - - check-style - verify - - UTF-8 - - - check - - - + + + check-style + verify + + UTF-8 + + + check + + +