diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml index c07cedf690..a9bbe6f911 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/pom.xml @@ -26,6 +26,101 @@ org.apache.nifi nifi-flume-processors + + + + org.apache.hadoop + hadoop-client + + + org.xerial.snappy + snappy-java + + + com.thoughtworks.paranamer + paranamer + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-core-asl + + + com.google.guava + guava + + + com.google.guava + guava + + + commons-logging + commons-logging + + + org.apache.commons + commons-compress + + + commons-codec + commons-codec + + + org.apache.avro + avro + + + org.apache.zookeeper + zookeeper + + + commons-lang + commons-lang + + + commons-cli + commons-cli + + + commons-collections + commons-collections + + + commons-io + commons-io + + + com.google.code.gson + gson + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpcore + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + log4j + log4j + + + com.google.protobuf + protobuf-java + + org.apache.nifi diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/LICENSE b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..c1a3ec4fd2 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,319 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + +The binary distribution of this product bundles 'SUAsync Library' which is +available under a 3-Clause BSD License. + + Copyright (c) 2010 StumbleUpon, Inc. All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + - Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + - Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + - Neither the name of the StumbleUpon nor the names of its contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + POSSIBILITY OF SUCH DAMAGE. + +The binary distribution of this product bundles 'Asynchronous HBase Client' +which is available under a 3-Clause BSD License. + + Copyright (C) 2010-2012 The Async HBase Authors. All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + - Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + - Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + - Neither the name of the StumbleUpon nor the names of its contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + POSSIBILITY OF SUCH DAMAGE. + +The binary distribution of this product bundles 'JOpt Simple' which is +available under the MIT license. + + The MIT License + + Copyright (c) 2004-2011 Paul R. Holser, Jr. + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +The binary distribution of this product bundles 'Scala Library' under a BSD +style license. + + Copyright (c) 2002-2015 EPFL + Copyright (c) 2011-2015 Typesafe, Inc. + + All rights reserved. + + Redistribution and use in source and binary forms, with or without modification, + are permitted provided that the following conditions are met: + + Redistributions of source code must retain the above copyright notice, this list of + conditions and the following disclaimer. + + Redistributions in binary form must reproduce the above copyright notice, this list of + conditions and the following disclaimer in the documentation and/or other materials + provided with the distribution. + + Neither the name of the EPFL nor the names of its contributors may be used to endorse + or promote products derived from this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS “AS IS” AND ANY EXPRESS + OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY + AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER + IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/NOTICE b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..793746f194 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,145 @@ +nifi-social-media-nar +Copyright 2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Avro + The following NOTICE information applies: + Apache Avro + Copyright 2009-2012 The Apache Software Foundation + + (ASLv2) Apache Commons JEXL + The following NOTICE information applies: + Apache Commons JEXL + Copyright 2001-2011 The Apache Software Foundation + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache Flume + The following NOTICE information applies: + Apache Flume + Copyright 2011-2015 Apache Software Foundation + + (ASLv2) IRClib + The following NOTICE information applies: + IRClib -- A Java Internet Relay Chat library -- + Copyright (C) 2002 - 2006 Christoph Schwering + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. + + (ASLv2) Joda-Time + The following NOTICE information applies: + ============================================================================= + = NOTICE file corresponding to section 4d of the Apache License Version 2.0 = + ============================================================================= + This product includes software developed by + Joda.org (http://www.joda.org/). + + (ASLv2) Apache Kafka + The following NOTICE information applies: + Apache Kafka + Copyright 2012 The Apache Software Foundation. + + (ASLv2) Kite SDK + The following NOTICE information applies: + This product includes software developed by Cloudera, Inc. + (http://www.cloudera.com/). + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + This product includes software developed by + Saxonica (http://www.saxonica.com/). + + (ASLv2) Apache Thrift + The following NOTICE information applies: + Apache Thrift + Copyright 2006-2010 The Apache Software Foundation. + + (ASLv2) Yammer Metrics + The following NOTICE information applies: + Metrics + Copyright 2010-2012 Coda Hale and Yammer, Inc. + + This product includes software developed by Coda Hale and Yammer, Inc. + + This product includes code derived from the JSR-166 project (ThreadLocalRandom), which was released + with the following comments: + + Written by Doug Lea with assistance from members of JCP JSR-166 + Expert Group and released to the public domain, as explained at + http://creativecommons.org/publicdomain/zero/1.0/ + + (ASLv2) Apache MINA + The following NOTICE information applies: + Apache MINA Core + Copyright 2004-2011 Apache MINA Project + + (ASLv2) The Netty Project + The following NOTICE information applies: + The Netty Project + Copyright 2011 The Netty Project + + (ASLv2) opencsv (net.sf.opencsv:opencsv:2.3) + + (ASLv2) Parquet MR + The following NOTICE information applies: + Parquet MR + Copyright 2012 Twitter, Inc. + + This project includes code from https://github.com/lemire/JavaFastPFOR + parquet-column/src/main/java/parquet/column/values/bitpacking/LemireBitPacking.java + Apache License Version 2.0 http://www.apache.org/licenses/. + (c) Daniel Lemire, http://lemire.me/en/ + + (ASLv2) Servlet Specification API (org.mortbay.jetty:servlet-api:2.5-20110124) + + (ASLv2) Twitter4J + The following NOTICE information applies: + Copyright 2007 Yusuke Yamamoto + + Twitter4J includes software from JSON.org to parse JSON response from the Twitter API. You can see the license term at http://www.JSON.org/license.html + + (ASLv2) Apache Velocity + The following NOTICE information applies: + Apache Velocity + Copyright (C) 2000-2007 The Apache Software Foundation + + (ASLv2) ZkClient + The following NOTICE information applies: + ZkClient + Copyright 2009 Stefan Groschupf \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml index 1dad25f6df..167aa6e943 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml @@ -21,6 +21,11 @@ nifi-flume-processors jar + + + 1.6.0 + + org.apache.nifi @@ -41,12 +46,12 @@ org.apache.flume flume-ng-sdk - 1.5.2 + ${flume.version} org.apache.flume flume-ng-core - 1.5.2 + ${flume.version} org.slf4j @@ -56,27 +61,38 @@ - - org.apache.flume.flume-ng-sources - flume-twitter-source - 1.5.2 - + org.apache.flume.flume-ng-sources flume-jms-source - 1.5.2 + ${flume.version} + + + org.apache.flume.flume-ng-sources + flume-kafka-source + ${flume.version} org.apache.flume.flume-ng-sources flume-scribe-source - 1.5.2 + ${flume.version} + + + org.apache.flume.flume-ng-sources + flume-twitter-source + ${flume.version} + + org.apache.flume.flume-ng-sinks + flume-dataset-sink + ${flume.version} + org.apache.flume.flume-ng-sinks flume-hdfs-sink - 1.5.2 + ${flume.version} @@ -97,25 +113,35 @@ provided + + org.apache.flume.flume-ng-sinks + flume-hive-sink + ${flume.version} + org.apache.flume.flume-ng-sinks flume-irc-sink - 1.5.2 + ${flume.version} org.apache.flume.flume-ng-sinks flume-ng-elasticsearch-sink - 1.5.2 + ${flume.version} org.apache.flume.flume-ng-sinks flume-ng-hbase-sink - 1.5.2 + ${flume.version} + + + org.apache.flume.flume-ng-sinks + flume-ng-kafka-sink + ${flume.version} org.apache.flume.flume-ng-sinks flume-ng-morphline-solr-sink - 1.5.2 + ${flume.version} diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java index 83ae9e1aeb..9b75047a1e 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/AbstractFlumeProcessor.java @@ -77,7 +77,7 @@ public abstract class AbstractFlumeProcessor extends AbstractSessionFactoryProce public ValidationResult validate(final String subject, final String value, final ValidationContext context) { String reason = null; try { - FlumeSourceProcessor.SOURCE_FACTORY.create("NiFi Source", value); + ExecuteFlumeSource.SOURCE_FACTORY.create("NiFi Source", value); } catch (Exception ex) { reason = ex.getLocalizedMessage(); reason = Character.toLowerCase(reason.charAt(0)) + reason.substring(1); @@ -97,7 +97,7 @@ public abstract class AbstractFlumeProcessor extends AbstractSessionFactoryProce public ValidationResult validate(final String subject, final String value, final ValidationContext context) { String reason = null; try { - FlumeSinkProcessor.SINK_FACTORY.create("NiFi Sink", value); + ExecuteFlumeSink.SINK_FACTORY.create("NiFi Sink", value); } catch (Exception ex) { reason = ex.getLocalizedMessage(); reason = Character.toLowerCase(reason.charAt(0)) + reason.substring(1); diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java similarity index 97% rename from nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java rename to nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java index 2d8506d578..8ccb2d14aa 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSinkProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSink.java @@ -25,6 +25,7 @@ import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.Sink; import org.apache.flume.conf.Configurables; +import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -45,7 +46,8 @@ import org.apache.nifi.processor.util.StandardValidators; */ @Tags({"flume", "hadoop", "get", "sink"}) @CapabilityDescription("Write FlowFile data to a Flume sink") -public class FlumeSinkProcessor extends AbstractFlumeProcessor { +@TriggerSerially +public class ExecuteFlumeSink extends AbstractFlumeProcessor { public static final PropertyDescriptor SINK_TYPE = new PropertyDescriptor.Builder() .name("Sink Type") diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java similarity index 93% rename from nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java rename to nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java index 55b1f2f8ec..fa02750df6 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/FlumeSourceProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/ExecuteFlumeSource.java @@ -29,6 +29,7 @@ import org.apache.flume.Source; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurables; import org.apache.flume.source.EventDrivenSourceRunner; +import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -50,7 +51,8 @@ import org.apache.nifi.processor.util.StandardValidators; */ @Tags({"flume", "hadoop", "get", "source"}) @CapabilityDescription("Generate FlowFile data from a Flume source") -public class FlumeSourceProcessor extends AbstractFlumeProcessor { +@TriggerSerially +public class ExecuteFlumeSource extends AbstractFlumeProcessor { public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder() .name("Source Type") @@ -113,17 +115,12 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor { public void onScheduled(final SchedulingContext context) { try { source = SOURCE_FACTORY.create( - context.getProperty(SOURCE_NAME) - .getValue(), - context.getProperty(SOURCE_TYPE) - .getValue()); + context.getProperty(SOURCE_NAME).getValue(), + context.getProperty(SOURCE_TYPE).getValue()); - String flumeConfig = context.getProperty(FLUME_CONFIG) - .getValue(); - String agentName = context.getProperty(AGENT_NAME) - .getValue(); - String sourceName = context.getProperty(SOURCE_NAME) - .getValue(); + String flumeConfig = context.getProperty(FLUME_CONFIG).getValue(); + String agentName = context.getProperty(AGENT_NAME).getValue(); + String sourceName = context.getProperty(SOURCE_NAME).getValue(); Configurables.configure(source, getFlumeSourceContext(flumeConfig, agentName, sourceName)); @@ -133,8 +130,7 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor { source.start(); } } catch (Throwable th) { - getLogger() - .error("Error creating source", th); + getLogger().error("Error creating source", th); throw Throwables.propagate(th); } } @@ -191,6 +187,8 @@ public class FlumeSourceProcessor extends AbstractFlumeProcessor { } catch (EventDeliveryException ex) { throw new ProcessException("Error processing pollable source", ex); } + } else { + throw new ProcessException("Invalid source type: " + source.getClass().getSimpleName()); } } } diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSessionFactoryChannel.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSessionFactoryChannel.java index bc565878ba..eb31a66420 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSessionFactoryChannel.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiSessionFactoryChannel.java @@ -39,7 +39,6 @@ public class NifiSessionFactoryChannel extends BasicChannelSemantics { LifecycleState lifecycleState = getLifecycleState(); if (lifecycleState == LifecycleState.STOP) { throw new ChannelFullException("Can't write to a stopped channel"); - //return null; } return new NifiTransaction(sessionFactory.createSession(), relationship); } diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java index 5dc97d65e7..fdff203544 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java @@ -26,8 +26,13 @@ import org.apache.flume.Event; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.io.InputStreamCallback; +import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.ENTRY_DATE_HEADER; +import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.ID_HEADER; +import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.LAST_QUEUE_DATE_HEADER; +import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.LINEAGE_IDENTIFIERS_HEADER; +import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.LINEAGE_START_DATE_HEADER; +import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.SIZE_HEADER; -import static org.apache.nifi.processors.flume.util.FlowFileEventConstants.*; import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.StreamUtils; @@ -105,7 +110,7 @@ public class FlowFileEvent implements Event { @Override public void process(InputStream in) throws IOException { try (BufferedInputStream input = new BufferedInputStream(in)) { - StreamUtils.copy(in, baos); + StreamUtils.copy(input, baos); } baos.close(); } diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSinkTest.java similarity index 84% rename from nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java rename to nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSinkTest.java index 0654138cfd..6a0c40d13d 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSinkTest.java @@ -45,17 +45,17 @@ import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FlumeSinkProcessorTest { +public class ExecuteFlumeSinkTest { private static final Logger logger = - LoggerFactory.getLogger(FlumeSinkProcessorTest.class); + LoggerFactory.getLogger(ExecuteFlumeSinkTest.class); @Rule public final TemporaryFolder temp = new TemporaryFolder(); @Test public void testValidators() { - TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); + TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSink.class); Collection results; ProcessContext pc; @@ -73,7 +73,7 @@ public class FlumeSinkProcessorTest { // non-existent class results = new HashSet<>(); - runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "invalid.class.name"); + runner.setProperty(ExecuteFlumeSink.SINK_TYPE, "invalid.class.name"); runner.enqueue(new byte[0]); pc = runner.getProcessContext(); if (pc instanceof MockProcessContext) { @@ -87,7 +87,7 @@ public class FlumeSinkProcessorTest { // class doesn't implement Sink results = new HashSet<>(); - runner.setProperty(FlumeSinkProcessor.SINK_TYPE, AvroSource.class.getName()); + runner.setProperty(ExecuteFlumeSink.SINK_TYPE, AvroSource.class.getName()); runner.enqueue(new byte[0]); pc = runner.getProcessContext(); if (pc instanceof MockProcessContext) { @@ -100,7 +100,7 @@ public class FlumeSinkProcessorTest { } results = new HashSet<>(); - runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName()); + runner.setProperty(ExecuteFlumeSink.SINK_TYPE, NullSink.class.getName()); runner.enqueue(new byte[0]); pc = runner.getProcessContext(); if (pc instanceof MockProcessContext) { @@ -112,8 +112,8 @@ public class FlumeSinkProcessorTest { @Test public void testNullSink() throws IOException { - TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); - runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName()); + TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSink.class); + runner.setProperty(ExecuteFlumeSink.SINK_TYPE, NullSink.class.getName()); try (InputStream inputStream = getClass().getResourceAsStream("/testdata/records.txt")) { Map attributes = new HashMap<>(); attributes.put(CoreAttributes.FILENAME.key(), "records.txt"); @@ -124,9 +124,9 @@ public class FlumeSinkProcessorTest { @Test public void testBatchSize() throws IOException { - TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); - runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName()); - runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG, + TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSink.class); + runner.setProperty(ExecuteFlumeSink.SINK_TYPE, NullSink.class.getName()); + runner.setProperty(ExecuteFlumeSink.FLUME_CONFIG, "tier1.sinks.sink-1.batchSize = 1000\n"); for (int i = 0; i < 100000; i++) { runner.enqueue(String.valueOf(i).getBytes()); @@ -138,9 +138,9 @@ public class FlumeSinkProcessorTest { public void testHdfsSink() throws IOException { File destDir = temp.newFolder("hdfs"); - TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class); - runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "hdfs"); - runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG, + TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSink.class); + runner.setProperty(ExecuteFlumeSink.SINK_TYPE, "hdfs"); + runner.setProperty(ExecuteFlumeSink.FLUME_CONFIG, "tier1.sinks.sink-1.hdfs.path = " + destDir.toURI().toString() + "\n" + "tier1.sinks.sink-1.hdfs.fileType = DataStream\n" + "tier1.sinks.sink-1.hdfs.serializer = TEXT\n" + diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSourceTest.java similarity index 82% rename from nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java rename to nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSourceTest.java index bf32095f7b..924776ee5d 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/ExecuteFlumeSourceTest.java @@ -41,16 +41,16 @@ import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FlumeSourceProcessorTest { +public class ExecuteFlumeSourceTest { - private static final Logger logger = LoggerFactory.getLogger(FlumeSourceProcessorTest.class); + private static final Logger logger = LoggerFactory.getLogger(ExecuteFlumeSourceTest.class); @Rule public final TemporaryFolder temp = new TemporaryFolder(); @Test public void testValidators() { - TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class); + TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSource.class); Collection results; ProcessContext pc; @@ -68,7 +68,7 @@ public class FlumeSourceProcessorTest { // non-existent class results = new HashSet<>(); - runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "invalid.class.name"); + runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE, "invalid.class.name"); runner.enqueue(new byte[0]); pc = runner.getProcessContext(); if (pc instanceof MockProcessContext) { @@ -82,7 +82,7 @@ public class FlumeSourceProcessorTest { // class doesn't implement Source results = new HashSet<>(); - runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, NullSink.class.getName()); + runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE, NullSink.class.getName()); runner.enqueue(new byte[0]); pc = runner.getProcessContext(); if (pc instanceof MockProcessContext) { @@ -95,7 +95,7 @@ public class FlumeSourceProcessorTest { } results = new HashSet<>(); - runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, AvroSource.class.getName()); + runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE, AvroSource.class.getName()); runner.enqueue(new byte[0]); pc = runner.getProcessContext(); if (pc instanceof MockProcessContext) { @@ -106,10 +106,10 @@ public class FlumeSourceProcessorTest { @Test public void testSequenceSource() { - TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class); - runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "seq"); + TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSource.class); + runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE, "seq"); runner.run(); - List flowFiles = runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS); + List flowFiles = runner.getFlowFilesForRelationship(ExecuteFlumeSource.SUCCESS); Assert.assertEquals(1, flowFiles.size()); for (MockFlowFile flowFile : flowFiles) { logger.debug(flowFile.toString()); @@ -123,16 +123,16 @@ public class FlumeSourceProcessorTest { File dst = new File(spoolDirectory, "records.txt"); FileUtils.copyFile(getClass().getResourceAsStream("/testdata/records.txt"), dst, true, false); - TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class); - runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "spooldir"); - runner.setProperty(FlumeSinkProcessor.FLUME_CONFIG, + TestRunner runner = TestRunners.newTestRunner(ExecuteFlumeSource.class); + runner.setProperty(ExecuteFlumeSource.SOURCE_TYPE, "spooldir"); + runner.setProperty(ExecuteFlumeSink.FLUME_CONFIG, "tier1.sources.src-1.spoolDir = " + spoolDirectory.getAbsolutePath()); runner.run(1, false, true); // Because the spool directory source is an event driven source, it may take some time for flow files to get // produced. I'm willing to wait up to 5 seconds, but will bail out early if possible. If it takes longer than // that then there is likely a bug. int numWaits = 10; - while (runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS).size() < 4 && --numWaits > 0) { + while (runner.getFlowFilesForRelationship(ExecuteFlumeSource.SUCCESS).size() < 4 && --numWaits > 0) { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException ex) { @@ -140,9 +140,9 @@ public class FlumeSourceProcessorTest { } } runner.shutdown(); - runner.assertTransferCount(FlumeSourceProcessor.SUCCESS, 4); + runner.assertTransferCount(ExecuteFlumeSource.SUCCESS, 4); int i = 1; - for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS)) { + for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(ExecuteFlumeSource.SUCCESS)) { flowFile.assertContentEquals("record " + i); i++; } diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml index a2742aa3a1..59aab3c8ee 100644 --- a/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/pom.xml @@ -32,7 +32,7 @@ org.apache.nifi nifi-flume-processors - 0.1.0-incubating-SNAPSHOT + 0.2.0-incubating-SNAPSHOT diff --git a/nifi/pom.xml b/nifi/pom.xml index 3a5c6b78b0..66a328c9c5 100644 --- a/nifi/pom.xml +++ b/nifi/pom.xml @@ -809,7 +809,7 @@ org.apache.nifi nifi-flume-nar - 0.1.0-incubating-SNAPSHOT + 0.2.0-incubating-SNAPSHOT nar @@ -864,10 +864,10 @@ 2.0.0 - org.apache.derby - derby - 10.11.1.1 - + org.apache.derby + derby + 10.11.1.1 +