From 259f5bba47b2890af936419e27a4dff141a7aeaa Mon Sep 17 00:00:00 2001 From: mans2singh Date: Thu, 2 Jun 2016 18:51:49 -0700 Subject: [PATCH] NIFI-1972 - Ignite processor This closes #502. Signed-off-by: Pierre Villard --- nifi-assembly/NOTICE | 15 + nifi-assembly/pom.xml | 5 + .../nifi-ignite-nar/pom.xml | 44 ++ .../src/main/resources/META-INF/LICENSE | 209 +++++++ .../src/main/resources/META-INF/NOTICE | 31 + .../nifi-ignite-processors/pom.xml | 75 +++ .../ignite/AbstractIgniteProcessor.java | 107 ++++ .../cache/AbstractIgniteCacheProcessor.java | 120 ++++ .../ignite/cache/PutIgniteCache.java | 392 ++++++++++++ .../org.apache.nifi.processor.Processor | 15 + .../src/main/resources/ignite-client.xml | 26 + .../main/resources/ignite-default-client.xml | 45 ++ .../ignite/cache/ITPutIgniteCache.java | 132 ++++ .../ignite/cache/TestPutIgniteCache.java | 581 ++++++++++++++++++ .../src/test/resources/test-default.xml | 44 ++ .../src/test/resources/test-ignite.xml | 26 + nifi-nar-bundles/nifi-ignite-bundle/pom.xml | 43 ++ nifi-nar-bundles/pom.xml | 1 + pom.xml | 21 + 19 files changed, 1932 insertions(+) create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-nar/src/main/resources/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/ignite-client.xml create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/ignite-default-client.xml create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/resources/test-default.xml create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/resources/test-ignite.xml create mode 100644 nifi-nar-bundles/nifi-ignite-bundle/pom.xml diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index 67deaa3108..7ccfc829fc 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -798,6 +798,21 @@ The following binary components are provided under the Apache Software License v This product includes software developed by The Groovy community (http://groovy.codehaus.org/). + (ASLv2) Apache Ignite + The following NOTICE information applies: + Apache Ignite + Copyright 2015 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + + This software includes code from IntelliJ IDEA Community Edition + Copyright (C) JetBrains s.r.o. + https://www.jetbrains.com/idea/ + Licensed under Apache License, Version 2.0. + http://search.maven.org/#artifactdetails%7Corg.jetbrains%7Cannotations%7C13.0%7Cjar + (ASLv2) Carrotsearch HPPC The following NOTICE information applies: HPPC borrowed code, ideas or both from: diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index e2b8595f25..1564313444 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -241,6 +241,11 @@ language governing permissions and limitations under the License. --> nifi-ambari-nar nar + + org.apache.nifi + nifi-ignite-nar + nar + org.apache.nifi nifi-avro-nar diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-nar/pom.xml b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-nar/pom.xml new file mode 100644 index 0000000000..4355289d6f --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-nar/pom.xml @@ -0,0 +1,44 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-ignite-bundle + 1.0.0-SNAPSHOT + + + nifi-ignite-nar + nar + + true + true + + + + + org.apache.nifi + nifi-standard-services-api-nar + nar + + + org.apache.nifi + nifi-ignite-processors + + + + diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..44893cdb29 --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,209 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..59627056ef --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,31 @@ +nifi-ignite-nar +Copyright 2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Ignite + The following NOTICE information applies: + Apache Ignite + Copyright 2015 The Apache Software Foundation + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + + This software includes code from IntelliJ IDEA Community Edition + Copyright (C) JetBrains s.r.o. + https://www.jetbrains.com/idea/ + Licensed under Apache License, Version 2.0. + http://search.maven.org/#artifactdetails%7Corg.jetbrains%7Cannotations%7C13.0%7Cjar + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2012 The Apache Software Foundation diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/pom.xml b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/pom.xml new file mode 100644 index 0000000000..7f599fffc3 --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/pom.xml @@ -0,0 +1,75 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-ignite-bundle + 1.0.0-SNAPSHOT + + + nifi-ignite-processors + jar + + + + org.apache.ignite + ignite-core + + + org.apache.ignite + ignite-log4j2 + test + + + org.apache.ignite + ignite-spring + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + commons-io + commons-io + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + test + + + com.google.guava + guava + test + + + diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java new file mode 100644 index 0000000000..9c455af203 --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/AbstractIgniteProcessor.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.ignite; + +import org.apache.commons.lang3.StringUtils; +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +/** + * Base class for Ignite processors + */ +public abstract class AbstractIgniteProcessor extends AbstractProcessor { + + /** + * Ignite spring configuration file + */ + public static final PropertyDescriptor IGNITE_CONFIGURATION_FILE = new PropertyDescriptor.Builder() + .displayName("Ignite Spring Properties Xml File") + .name("ignite-spring-properties-xml-file") + .description("Ignite spring configuration file, /.xml. If the " + + "configuration file is not provided, default Ignite configuration " + + "configuration is used which binds to 127.0.0.1:47500..47509") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + /** + * Success relation + */ + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("All FlowFiles that are written to Ignite cache are routed to this relationship").build(); + + /** + * Failure relation + */ + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("All FlowFiles that cannot be written to Ignite cache are routed to this relationship").build(); + + /** + * The ignite instance + */ + private transient Ignite ignite; + + /** + * Get ignite instance + * @return iginte instance + */ + protected Ignite getIgnite() { + return ignite; + } + + /** + * Close ignite instance + */ + public void closeIgnite() { + if (ignite != null) { + getLogger().info("Closing ignite client"); + ignite.close(); + ignite = null; + } + } + + /** + * Initialize ignite instance + * @param context process context + */ + public void initializeIgnite(ProcessContext context) { + + if ( getIgnite() != null ) { + getLogger().warn("Ignite already initialized"); + return; + } + + Ignition.setClientMode(true); + + String configuration = context.getProperty(IGNITE_CONFIGURATION_FILE).getValue(); + getLogger().info("Initializing ignite with configuration {} ", new Object[] { configuration }); + if ( StringUtils.isEmpty(configuration) ) { + ignite = Ignition.start(); + } else { + ignite = Ignition.start(configuration); + } + + } + +} diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java new file mode 100644 index 0000000000..8e1a7cb008 --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.ignite.cache; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.ignite.IgniteCache; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.ignite.AbstractIgniteProcessor; + +/** + * Base class of Ignite cache based processor + */ +public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcessor { + + /** + * Ignite cache name + */ + protected static final PropertyDescriptor CACHE_NAME = new PropertyDescriptor.Builder() + .displayName("Ignite Cache Name") + .name("ignite-cache-name") + .description("The name of the ignite cache") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + /** + * Property descriptors + */ + protected static List descriptors; + + /** + * Relations + */ + protected static Set relationships; + + /** + * Ignite cache instance + */ + private transient IgniteCache igniteCache; + + /** + * Get ignite cache instance + * @return ignite cache instance + */ + protected IgniteCache getIgniteCache() { + return igniteCache; + } + + static { + final Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(rels); + } + + @Override + public Set getRelationships() { + return relationships; + } + + /** + * Initialize the ignite cache instance + * @param context process context + * @throws ProcessException if there is a problem while scheduling the processor + */ + public void initializeIgniteCache(ProcessContext context) throws ProcessException { + + getLogger().info("Initializing Ignite cache"); + + try { + if ( getIgnite() == null ) { + getLogger().info("Initializing ignite as client"); + super.initializeIgnite(context); + } + + String cacheName = context.getProperty(CACHE_NAME).getValue(); + igniteCache = getIgnite().getOrCreateCache(cacheName); + + } catch (Exception e) { + getLogger().error("Failed to initialize ignite cache due to {}", new Object[] { e }, e); + throw new ProcessException(e); + } + } + + /** + * Close Ignite cache instance and calls base class closeIgnite + */ + @OnStopped + public void closeIgniteCache() { + if (igniteCache != null) { + getLogger().info("Closing ignite cache"); + igniteCache.close(); + igniteCache = null; + } + super.closeIgnite(); + } +} diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java new file mode 100644 index 0000000000..c61649f7b8 --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/PutIgniteCache.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.ignite.cache; + +import java.io.IOException; +import java.io.InputStream; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression.ResultType; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; + +/** + * Put cache processors which pushes the flow file content into Ignite Cache using + * DataStreamer interface + */ +@EventDriven +@SupportsBatching +@Tags({ "Ignite", "insert", "update", "stream", "write", "put", "cache", "key" }) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Stream the contents of a FlowFile to Ignite Cache using DataStreamer. " + + "The processor uses the value of FlowFile attribute (Ignite cache entry key) as the " + + "cache key and the byte array of the FlowFile as the value of the cache entry value. Both the string key and a " + + " non-empty byte array value are required otherwise the FlowFile is transfered to the failure relation. " + + "Note - The Ignite Kernel periodically outputs node performance statistics to the logs. This message " + + " can be turned off by setting the log level for logger 'org.apache.ignite' to WARN in the logback.xml configuration file.") +@WritesAttributes({ + @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, description = "The total number of FlowFile in the batch"), + @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, description = "The item number of FlowFile in the batch"), + @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, description = "The successful FlowFile item number"), + @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, description = "The number of successful FlowFiles"), + @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, description = "The failed FlowFile item number"), + @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, description = "The total number of failed FlowFiles in the batch"), + @WritesAttribute(attribute = PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, description = "The failed reason attribute key") + }) +public class PutIgniteCache extends AbstractIgniteCacheProcessor { + + /** + * The batch size of flow files to be processed on invocation of onTrigger + */ + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .displayName("Batch Size For Entries") + .name("batch-size-for-entries") + .description("Batch size for entries (1-500).") + .defaultValue("250") + .required(true) + .addValidator(StandardValidators.createLongValidator(1, 500, true)) + .sensitive(false) + .build(); + + /** + * Data streamer's per node parallelism + */ + public static final PropertyDescriptor DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS = new PropertyDescriptor.Builder() + .displayName("Data Streamer Per Node Parallel Operations") + .name("data-streamer-per-node-parallel-operations") + .description("Data streamer per node parallelism") + .defaultValue("5") + .required(true) + .addValidator(StandardValidators.createLongValidator(1, 10, true)) + .sensitive(false) + .build(); + + /** + * Data streamers per node buffer size + */ + public static final PropertyDescriptor DATA_STREAMER_PER_NODE_BUFFER_SIZE = new PropertyDescriptor.Builder() + .displayName("Data Streamer Per Node Buffer Size") + .name("data-streamer-per-node-buffer-size") + .description("Data streamer per node buffer size (1-500).") + .defaultValue("250") + .required(true) + .addValidator(StandardValidators.createLongValidator(1, 500, true)) + .sensitive(false) + .build(); + + /** + * Data streamers auto flush frequency + */ + public static final PropertyDescriptor DATA_STREAMER_AUTO_FLUSH_FREQUENCY = new PropertyDescriptor.Builder() + .displayName("Data Streamer Auto Flush Frequency in millis") + .name("data-streamer-auto-flush-frequency-in-millis") + .description("Data streamer flush interval in millis seconds") + .defaultValue("10") + .required(true) + .addValidator(StandardValidators.createLongValidator(1, 100, true)) + .sensitive(false) + .build(); + + /** + * Data streamers override values property + */ + public static final PropertyDescriptor DATA_STREAMER_ALLOW_OVERRIDE = new PropertyDescriptor.Builder() + .displayName("Data Streamer Allow Override") + .name("data-streamer-allow-override") + .description("Whether to override values already in the cache") + .defaultValue("false") + .required(true) + .allowableValues(new AllowableValue("true"), new AllowableValue("false")) + .sensitive(false) + .build(); + + public static final PropertyDescriptor IGNITE_CACHE_ENTRY_KEY = new PropertyDescriptor.Builder() + .displayName("Ignite Cache Entry Identifier") + .name("ignite-cache-entry-identifier") + .description("A FlowFile attribute, or attribute expression used " + + "for determining Ignite cache key for the Flow File content") + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) + .expressionLanguageSupported(true) + .build(); + + /** Flow file attribute keys and messages */ + public static final String IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT = "ignite.cache.batch.flow.file.total.count"; + public static final String IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER = "ignite.cache.batch.flow.file.item.number"; + public static final String IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT = "ignite.cache.batch.flow.file.successful.count"; + public static final String IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER = "ignite.cache.batch.flow.file.successful.number"; + public static final String IGNITE_BATCH_FLOW_FILE_FAILED_COUNT = "ignite.cache.batch.flow.file.failed.count"; + public static final String IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER = "ignite.cache.batch.flow.file.failed.number"; + public static final String IGNITE_BATCH_FLOW_FILE_FAILED_FILE_SIZE = "ignite.cache.batch.flow.file.failed.size"; + public static final String IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY = "ignite.cache.batch.flow.file.failed.reason"; + public static final String IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE = "The FlowFile key attribute was missing"; + public static final String IGNITE_BATCH_FLOW_FILE_FAILED_ZERO_SIZE_MESSAGE = "The FlowFile size was zero"; + + static { + descriptors = new ArrayList<>(); + descriptors.add(IGNITE_CONFIGURATION_FILE); + descriptors.add(CACHE_NAME); + descriptors.add(BATCH_SIZE); + descriptors.add(IGNITE_CACHE_ENTRY_KEY); + descriptors.add(DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS); + descriptors.add(DATA_STREAMER_PER_NODE_BUFFER_SIZE); + descriptors.add(DATA_STREAMER_AUTO_FLUSH_FREQUENCY); + descriptors.add(DATA_STREAMER_ALLOW_OVERRIDE); + } + + /** + * Data streamer instance + */ + private transient IgniteDataStreamer igniteDataStreamer; + + @Override + public List getSupportedPropertyDescriptors() { + return descriptors; + } + + /** + * Close data streamer and calls base classes close ignite cache + */ + @OnStopped + public final void closeIgniteDataStreamer() { + if (igniteDataStreamer != null) { + getLogger().info("Closing ignite data streamer"); + igniteDataStreamer.flush(); + igniteDataStreamer.close(); + igniteDataStreamer = null; + } + super.closeIgniteCache(); + } + + /** + * Get data streamer + * @return data streamer instance + */ + protected IgniteDataStreamer getIgniteDataStreamer() { + return igniteDataStreamer; + } + + /** + * Initialize ignite cache + */ + @OnScheduled + public final void initilizeIgniteDataStreamer(ProcessContext context) throws ProcessException { + super.initializeIgniteCache(context); + + if ( getIgniteDataStreamer() != null ) { + return; + } + + getLogger().info("Creating Ignite Datastreamer"); + try { + int perNodeParallelOperations = context.getProperty(DATA_STREAMER_PER_NODE_PARALLEL_OPERATIONS).asInteger(); + int perNodeBufferSize = context.getProperty(DATA_STREAMER_PER_NODE_BUFFER_SIZE).asInteger(); + int autoFlushFrequency = context.getProperty(DATA_STREAMER_AUTO_FLUSH_FREQUENCY).asInteger(); + boolean allowOverride = context.getProperty(DATA_STREAMER_ALLOW_OVERRIDE).asBoolean(); + + igniteDataStreamer = getIgnite().dataStreamer(getIgniteCache().getName()); + igniteDataStreamer.perNodeBufferSize(perNodeBufferSize); + igniteDataStreamer.perNodeParallelOperations(perNodeParallelOperations); + igniteDataStreamer.autoFlushFrequency(autoFlushFrequency); + igniteDataStreamer.allowOverwrite(allowOverride); + + } catch (Exception e) { + getLogger().error("Failed to schedule PutIgnite due to {}", new Object[] { e }, e); + throw new ProcessException(e); + } + } + + /** + * Handle flow files + */ + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final List flowFiles = session.get(batchSize); + + if (flowFiles.isEmpty()) { + return; + } + + List> cacheItems = new ArrayList<>(); + List successfulFlowFiles = new ArrayList<>(); + List failedFlowFiles = new ArrayList<>(); + try { + for (int i = 0; i < flowFiles.size(); i++) { + FlowFile flowFile = null; + try { + flowFile = flowFiles.get(i); + + String key = context.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile).getValue(); + + if ( isFailedFlowFile(flowFile, key) ) { + failedFlowFiles.add(flowFile); + continue; + } + + final byte[] byteArray = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, byteArray, true); + } + }); + + cacheItems.add(new AbstractMap.SimpleEntry(key, byteArray)); + successfulFlowFiles.add(flowFile); + + } catch (Exception e) { + getLogger().error("Failed to insert {} into IgniteDB due to {}", new Object[] { flowFile, e }, e); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + } + } + } finally { + if (!cacheItems.isEmpty()) { + IgniteFuture futures = igniteDataStreamer.addData(cacheItems); + Object result = futures.get(); + getLogger().debug("Result {} of addData", new Object [] {result}); + } + + if (!successfulFlowFiles.isEmpty()) { + successfulFlowFiles = updateSuccessfulFlowFileAttributes(flowFiles, successfulFlowFiles, session); + session.transfer(successfulFlowFiles, REL_SUCCESS); + for (FlowFile flowFile : successfulFlowFiles) { + String key = context.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile).getValue(); + session.getProvenanceReporter().send(flowFile, "ignite://cache/" + getIgniteCache().getName() + "/" + key); + } + } + + if (!failedFlowFiles.isEmpty()) { + failedFlowFiles = updateFailedFlowFileAttributes(flowFiles, failedFlowFiles, session, context); + session.transfer(failedFlowFiles, REL_FAILURE); + } + } + } + + /** + * Check if flow if corrupted (either flow file is empty or does not have a key attribute) + * @param flowFile the flow file to check + * @param key the cache key + * @return true if flow file is incomplete + */ + private boolean isFailedFlowFile(FlowFile flowFile, String key) { + if ( StringUtils.isEmpty(key) ) { + return true; + } + return flowFile.getSize() == 0; + } + + /** + * Add successful flow file attributes + * @param flowFiles all flow files + * @param successfulFlowFiles list of successful flow files + * @param session process session + * @return successful flow files with updated attributes + */ + protected List updateSuccessfulFlowFileAttributes( + List flowFiles, + List successfulFlowFiles, ProcessSession session) { + + int flowFileCount = flowFiles.size(); + int flowFileSuccessful = successfulFlowFiles.size(); + List updatedSuccessfulFlowFiles = new ArrayList<>(); + + for (int i = 0; i < flowFileSuccessful; i++) { + FlowFile flowFile = successfulFlowFiles.get(i); + Map attributes = new HashMap<>(); + attributes.put(IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, Integer.toString(i)); + attributes.put(IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, Integer.toString(flowFileCount)); + attributes.put(IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, Integer.toString(flowFiles.indexOf(flowFile))); + attributes.put(IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, Integer.toString(flowFileSuccessful)); + flowFile = session.putAllAttributes(flowFile, attributes); + updatedSuccessfulFlowFiles.add(flowFile); + } + + return updatedSuccessfulFlowFiles; + + } + + /** + * Add failed flow file attributes + * @param flowFiles all flow files + * @param failedFlowFiles list of failed flow files + * @param session process session + * @param context the process context + * @return failed flow files with updated attributes + */ + protected List updateFailedFlowFileAttributes( + List flowFiles, + List failedFlowFiles, ProcessSession session, ProcessContext context) { + + int flowFileCount = flowFiles.size(); + int flowFileFailed = failedFlowFiles.size(); + List updatedFailedFlowFiles = new ArrayList<>(); + + for (int i = 0; i < flowFileFailed; i++) { + FlowFile flowFile = failedFlowFiles.get(i); + + Map attributes = new HashMap<>(); + attributes.put(IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, Integer.toString(i)); + attributes.put(IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, Integer.toString(flowFileCount)); + attributes.put(IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, Integer.toString(flowFiles.indexOf(flowFile))); + attributes.put(IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, Integer.toString(flowFileFailed)); + + String key = context.getProperty(IGNITE_CACHE_ENTRY_KEY).evaluateAttributeExpressions(flowFile).getValue(); + + if (StringUtils.isEmpty(key)) { + attributes.put(IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, + IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE); + } else if (flowFile.getSize() == 0) { + attributes.put(IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, + IGNITE_BATCH_FLOW_FILE_FAILED_ZERO_SIZE_MESSAGE); + } else { + throw new ProcessException("Unknown reason for failing file: " + flowFile); + } + + flowFile = session.putAllAttributes(flowFile, attributes); + updatedFailedFlowFiles.add(flowFile); + } + + return updatedFailedFlowFiles; + + } +} diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..d1da921940 --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.ignite.cache.PutIgniteCache diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/ignite-client.xml b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/ignite-client.xml new file mode 100644 index 0000000000..fd9b7aab3e --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/ignite-client.xml @@ -0,0 +1,26 @@ + + + + + + + + diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/ignite-default-client.xml b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/ignite-default-client.xml new file mode 100644 index 0000000000..114f87f734 --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/resources/ignite-default-client.xml @@ -0,0 +1,45 @@ + + + + + + + + + + + + + + + 127.0.0.1:47500..47509 + + + + + + + + diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java new file mode 100644 index 0000000000..e8a469eef8 --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/ITPutIgniteCache.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.ignite.cache; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ITPutIgniteCache { + + private static final String CACHE_NAME = "testCache"; + private static TestRunner runner; + private static PutIgniteCache putIgniteCache; + private static Map properties1; + private static HashMap properties2; + + @BeforeClass + public static void setUp() throws IOException { + putIgniteCache = new PutIgniteCache(); + properties1 = new HashMap(); + properties2 = new HashMap(); + } + + @AfterClass + public static void teardown() { + runner = null; + putIgniteCache = null; + } + + @Test + public void testPutIgniteCacheOnTriggerFileConfigurationOneFlowFile() throws IOException, InterruptedException { + runner = TestRunners.newTestRunner(putIgniteCache); + runner.setProperty(PutIgniteCache.BATCH_SIZE, "5"); + runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(PutIgniteCache.IGNITE_CONFIGURATION_FILE, + "file:///" + new File(".").getAbsolutePath() + "/src/test/resources/test-ignite.xml"); + runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1"); + runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + properties1.put("igniteKey", "key5"); + runner.enqueue("test".getBytes(),properties1); + runner.run(1, false, true); + + runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 1); + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS); + assertEquals(1, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE); + assertEquals(0, failureFlowFiles.size()); + + final MockFlowFile out = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(0); + + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, "0"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "1"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, "1"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); + + out.assertContentEquals("test".getBytes()); + Assert.assertArrayEquals("test".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key5")); + runner.shutdown(); + } + + @Test + public void testPutIgniteCacheOnTriggerNoConfigurationTwoFlowFile() throws IOException, InterruptedException { + runner = TestRunners.newTestRunner(putIgniteCache); + runner.setProperty(PutIgniteCache.BATCH_SIZE, "5"); + runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1"); + runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + properties1.put("igniteKey", "key51"); + runner.enqueue("test1".getBytes(),properties1); + properties2.put("igniteKey", "key52"); + runner.enqueue("test2".getBytes(),properties2); + runner.run(1, false, true); + + runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2); + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS); + assertEquals(2, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE); + assertEquals(0, failureFlowFiles.size()); + + final MockFlowFile out = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(0); + + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, "0"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, "2"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); + + out.assertContentEquals("test1".getBytes()); + Assert.assertArrayEquals("test1".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key51")); + + final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(1); + + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, "1"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, "2"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "1"); + + out2.assertContentEquals("test2".getBytes()); + Assert.assertArrayEquals("test2".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key52")); + + runner.shutdown(); + } +} diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java new file mode 100644 index 0000000000..eebb2372ba --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/java/org/apache/nifi/processors/ignite/cache/TestPutIgniteCache.java @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.ignite.cache; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestPutIgniteCache { + + private static final String CACHE_NAME = "testCache"; + private TestRunner runner; + private PutIgniteCache putIgniteCache; + private Map properties1; + private Map properties2; + private static Ignite ignite; + + @BeforeClass + public static void setUpClass() { + ignite = Ignition.start("test-ignite.xml"); + + } + + @AfterClass + public static void tearDownClass() { + ignite.close(); + Ignition.stop(true); + } + + @Before + public void setUp() throws IOException { + putIgniteCache = new PutIgniteCache() { + @Override + protected Ignite getIgnite() { + return TestPutIgniteCache.ignite; + } + + }; + properties1 = new HashMap(); + properties1.put("igniteKey", "key1"); + properties2 = new HashMap(); + properties2.put("igniteKey", "key2"); + } + + @After + public void teardown() { + runner = null; + ignite.destroyCache(CACHE_NAME); + } + + @Test + public void testPutIgniteCacheOnTriggerDefaultConfigurationOneFlowFileWithPlainKey() throws IOException, InterruptedException { + + runner = TestRunners.newTestRunner(putIgniteCache); + runner.setProperty(PutIgniteCache.BATCH_SIZE, "5"); + runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1"); + runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "mykey"); + + runner.assertValid(); + runner.enqueue("test".getBytes(),properties1); + runner.run(1, false, true); + + runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 1); + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS); + assertEquals(1, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE); + assertEquals(0, failureFlowFiles.size()); + + final MockFlowFile out = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(0); + + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, "0"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "1"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, "1"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); + + out.assertContentEquals("test".getBytes()); + Assert.assertArrayEquals("test".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("mykey")); + runner.shutdown(); + } + + @Test + public void testPutIgniteCacheOnTriggerDefaultConfigurationOneFlowFile() throws IOException, InterruptedException { + + runner = TestRunners.newTestRunner(putIgniteCache); + runner.setProperty(PutIgniteCache.BATCH_SIZE, "5"); + runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1"); + runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + runner.enqueue("test".getBytes(),properties1); + runner.run(1, false, true); + + runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 1); + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS); + assertEquals(1, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE); + assertEquals(0, failureFlowFiles.size()); + + final MockFlowFile out = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(0); + + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, "0"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "1"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, "1"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); + + out.assertContentEquals("test".getBytes()); + Assert.assertArrayEquals("test".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key1")); + runner.shutdown(); + } + + @Test + public void testPutIgniteCacheOnTriggerDefaultConfigurationTwoFlowFilesAllowOverrideDefaultFalse() throws IOException, InterruptedException { + + runner = TestRunners.newTestRunner(putIgniteCache); + runner.setProperty(PutIgniteCache.BATCH_SIZE, "5"); + runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1"); + runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + runner.enqueue("test1".getBytes(),properties1); + runner.enqueue("test2".getBytes(),properties1); + runner.run(1, false, true); + + runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2); + + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS); + assertEquals(2, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE); + assertEquals(0, failureFlowFiles.size()); + + final MockFlowFile out1 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(0); + + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, "0"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, "2"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); + + out1.assertContentEquals("test1".getBytes()); + Assert.assertEquals("test1",new String(putIgniteCache.getIgniteCache().get("key1"))); + + final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(1); + + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, "1"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "1"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, "2"); + + out2.assertContentEquals("test2".getBytes()); + Assert.assertArrayEquals("test1".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key1")); + + runner.shutdown(); + } + + @Test + public void testPutIgniteCacheOnTriggerDefaultConfigurationTwoFlowFilesAllowOverrideTrue() throws IOException, InterruptedException { + + runner = TestRunners.newTestRunner(putIgniteCache); + runner.setProperty(PutIgniteCache.BATCH_SIZE, "5"); + runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1"); + runner.setProperty(PutIgniteCache.DATA_STREAMER_ALLOW_OVERRIDE, "true"); + runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + runner.enqueue("test1".getBytes(),properties1); + runner.enqueue("test2".getBytes(),properties1); + runner.run(1, false, true); + + runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2); + + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS); + assertEquals(2, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE); + assertEquals(0, failureFlowFiles.size()); + + final MockFlowFile out1 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(0); + + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, "0"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, "2"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); + + out1.assertContentEquals("test1".getBytes()); + Assert.assertEquals("test2",new String(putIgniteCache.getIgniteCache().get("key1"))); + + final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(1); + + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, "1"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "1"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, "2"); + + out2.assertContentEquals("test2".getBytes()); + + Assert.assertArrayEquals("test2".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key1")); + + runner.shutdown(); + } + + @Test + public void testPutIgniteCacheOnTriggerDefaultConfigurationOneFlowFileNoKey() throws IOException, InterruptedException { + + runner = TestRunners.newTestRunner(putIgniteCache); + runner.setProperty(PutIgniteCache.BATCH_SIZE, "5"); + runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1"); + runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + properties1.clear(); + runner.enqueue("test".getBytes(),properties1); + runner.run(1, false, true); + + runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_FAILURE, 1); + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS); + assertEquals(0, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE); + assertEquals(1, failureFlowFiles.size()); + + final MockFlowFile out = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE).get(0); + + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, "0"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "1"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, "1"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE); + + out.assertContentEquals("test".getBytes()); + assertNull((byte[])putIgniteCache.getIgniteCache().get("key1")); + runner.shutdown(); + } + + @Test + public void testPutIgniteCacheOnTriggerDefaultConfigurationOneFlowFileNoBytes() throws IOException, InterruptedException { + + runner = TestRunners.newTestRunner(putIgniteCache); + runner.setProperty(PutIgniteCache.BATCH_SIZE, "5"); + runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1"); + runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + runner.enqueue("".getBytes(),properties1); + runner.run(1, false, true); + + runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_FAILURE, 1); + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS); + assertEquals(0, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE); + assertEquals(1, failureFlowFiles.size()); + + final MockFlowFile out = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE).get(0); + + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, "0"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "1"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, "1"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); + out.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ZERO_SIZE_MESSAGE); + + out.assertContentEquals("".getBytes()); + assertNull((byte[])putIgniteCache.getIgniteCache().get("key1")); + runner.shutdown(); + } + + @Test + public void testPutIgniteCacheOnTriggerDefaultConfigurationTwoFlowFiles() throws IOException, InterruptedException { + + runner = TestRunners.newTestRunner(putIgniteCache); + runner.setProperty(PutIgniteCache.BATCH_SIZE, "5"); + runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1"); + runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + runner.enqueue("test1".getBytes(),properties1); + runner.enqueue("test2".getBytes(),properties2); + runner.run(1, false, true); + + runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_SUCCESS, 2); + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS); + assertEquals(2, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE); + assertEquals(0, failureFlowFiles.size()); + + final MockFlowFile out1 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(0); + + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, "0"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, "2"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); + + out1.assertContentEquals("test1".getBytes()); + Assert.assertArrayEquals("test1".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key1")); + + final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(1); + + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, "1"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "1"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, "2"); + + out2.assertContentEquals("test2".getBytes()); + Assert.assertArrayEquals("test2".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key2")); + + runner.shutdown(); + + } + + @Test + public void testPutIgniteCacheOnTriggerDefaultConfigurationTwoFlowFilesNoKey() throws IOException, InterruptedException { + + runner = TestRunners.newTestRunner(putIgniteCache); + runner.setProperty(PutIgniteCache.BATCH_SIZE, "5"); + runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1"); + runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + + runner.enqueue("test1".getBytes()); + runner.enqueue("test2".getBytes()); + runner.run(1, false, true); + + runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_FAILURE, 2); + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS); + assertEquals(0, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE); + assertEquals(2, failureFlowFiles.size()); + + final MockFlowFile out1 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE).get(0); + + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, "0"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, "2"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); + + out1.assertContentEquals("test1".getBytes()); + assertNull((byte[])putIgniteCache.getIgniteCache().get("key1")); + + final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE).get(1); + + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, "1"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, "2"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "1"); + + out2.assertContentEquals("test2".getBytes()); + assertNull((byte[])putIgniteCache.getIgniteCache().get("key2")); + + runner.shutdown(); + + } + + @Test + public void testPutIgniteCacheOnTriggerDefaultConfigurationTwoFlowFileFirstNoKey() throws IOException, InterruptedException { + + runner = TestRunners.newTestRunner(putIgniteCache); + runner.setProperty(PutIgniteCache.BATCH_SIZE, "5"); + runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1"); + runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + runner.enqueue("test1".getBytes()); + runner.enqueue("test2".getBytes(),properties2); + runner.run(1, false, true); + + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS); + assertEquals(1, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE); + assertEquals(1, failureFlowFiles.size()); + + final MockFlowFile out1 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE).get(0); + + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, "0"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, "1"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); + + out1.assertContentEquals("test1".getBytes()); + assertNull((byte[])putIgniteCache.getIgniteCache().get("key1")); + + final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(0); + + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, "0"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, "1"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "1"); + + out2.assertContentEquals("test2".getBytes()); + Assert.assertArrayEquals("test2".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key2")); + + runner.shutdown(); + + } + + @Test + public void testPutIgniteCacheOnTriggerDefaultConfigurationTwoFlowFileSecondNoKey() throws IOException, InterruptedException { + + runner = TestRunners.newTestRunner(putIgniteCache); + runner.setProperty(PutIgniteCache.BATCH_SIZE, "5"); + runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1"); + runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + + runner.enqueue("test1".getBytes(),properties1); + runner.enqueue("test2".getBytes()); + runner.run(1, false, true); + + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS); + assertEquals(1, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE); + assertEquals(1, failureFlowFiles.size()); + + final MockFlowFile out1 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(0); + + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, "0"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, "1"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); + + out1.assertContentEquals("test1".getBytes()); + Assert.assertArrayEquals("test1".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key1")); + + final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE).get(0); + + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, "0"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, "1"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "1"); + + out2.assertContentEquals("test2".getBytes()); + assertNull((byte[])putIgniteCache.getIgniteCache().get("key2")); + + + runner.shutdown(); + + } + + @Test + public void testPutIgniteCacheOnTriggerDefaultConfigurationTwoFlowFilesOneNoKeyOneNoBytes() throws IOException, InterruptedException { + + runner = TestRunners.newTestRunner(putIgniteCache); + runner.setProperty(PutIgniteCache.BATCH_SIZE, "5"); + runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1"); + runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + + runner.enqueue("test1".getBytes()); + runner.enqueue("".getBytes(),properties2); + runner.run(1, false, true); + + runner.assertAllFlowFilesTransferred(PutIgniteCache.REL_FAILURE, 2); + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS); + assertEquals(0, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE); + assertEquals(2, failureFlowFiles.size()); + + final MockFlowFile out1 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE).get(0); + + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, "0"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, "2"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); + + out1.assertContentEquals("test1".getBytes()); + assertNull((byte[])putIgniteCache.getIgniteCache().get("key1")); + + final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE).get(1); + + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, "1"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "2"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, "2"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ZERO_SIZE_MESSAGE); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "1"); + + out2.assertContentEquals("".getBytes()); + assertNull((byte[])putIgniteCache.getIgniteCache().get("key2")); + + runner.shutdown(); + + } + + @Test + public void testPutIgniteCacheOnTriggerDefaultConfigurationTwoFlowFilesOneNoKeySecondOkThirdNoBytes() throws IOException, InterruptedException { + + runner = TestRunners.newTestRunner(putIgniteCache); + runner.setProperty(PutIgniteCache.BATCH_SIZE, "5"); + runner.setProperty(PutIgniteCache.CACHE_NAME, CACHE_NAME); + runner.setProperty(PutIgniteCache.DATA_STREAMER_PER_NODE_BUFFER_SIZE, "1"); + runner.setProperty(PutIgniteCache.IGNITE_CACHE_ENTRY_KEY, "${igniteKey}"); + + runner.assertValid(); + runner.enqueue("test1".getBytes()); + runner.enqueue("test2".getBytes(),properties1); + runner.enqueue("".getBytes(),properties2); + runner.run(1, false, true); + + List sucessfulFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS); + assertEquals(1, sucessfulFlowFiles.size()); + List failureFlowFiles = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE); + assertEquals(2, failureFlowFiles.size()); + + final MockFlowFile out1 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE).get(0); + + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, "0"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "3"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, "2"); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_MISSING_KEY_MESSAGE); + out1.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "0"); + + out1.assertContentEquals("test1".getBytes()); + assertEquals("test2", new String(putIgniteCache.getIgniteCache().get("key1"))); + + final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_SUCCESS).get(0); + + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_ITEM_NUMBER, "0"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "3"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "1"); + out2.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_SUCCESSFUL_COUNT, "1"); + + out2.assertContentEquals("test2".getBytes()); + Assert.assertArrayEquals("test2".getBytes(),(byte[])putIgniteCache.getIgniteCache().get("key1")); + + final MockFlowFile out3 = runner.getFlowFilesForRelationship(PutIgniteCache.REL_FAILURE).get(1); + + out3.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ITEM_NUMBER, "1"); + out3.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_TOTAL_COUNT, "3"); + out3.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_COUNT, "2"); + out3.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_REASON_ATTRIBUTE_KEY, PutIgniteCache.IGNITE_BATCH_FLOW_FILE_FAILED_ZERO_SIZE_MESSAGE); + out3.assertAttributeEquals(PutIgniteCache.IGNITE_BATCH_FLOW_FILE_ITEM_NUMBER, "2"); + + out3.assertContentEquals("".getBytes()); + assertNull((byte[])putIgniteCache.getIgniteCache().get("key2")); + + runner.shutdown(); + + } +} diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/resources/test-default.xml b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/resources/test-default.xml new file mode 100644 index 0000000000..240a72d123 --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/resources/test-default.xml @@ -0,0 +1,44 @@ + + + + + + + + + + + + + + 127.0.0.1:47500..47509 + + + + + + + + diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/resources/test-ignite.xml b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/resources/test-ignite.xml new file mode 100644 index 0000000000..d8ec77e9bc --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/test/resources/test-ignite.xml @@ -0,0 +1,26 @@ + + + + + + + + diff --git a/nifi-nar-bundles/nifi-ignite-bundle/pom.xml b/nifi-nar-bundles/nifi-ignite-bundle/pom.xml new file mode 100644 index 0000000000..f142f0de66 --- /dev/null +++ b/nifi-nar-bundles/nifi-ignite-bundle/pom.xml @@ -0,0 +1,43 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 1.0.0-SNAPSHOT + + + nifi-ignite-bundle + pom + + + nifi-ignite-processors + nifi-ignite-nar + + + + + + org.apache.nifi + nifi-ignite-processors + 1.0.0-SNAPSHOT + + + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 945002f269..2a38a30d53 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -67,6 +67,7 @@ nifi-slack-bundle nifi-snmp-bundle nifi-windows-event-log-bundle + nifi-ignite-bundle diff --git a/pom.xml b/pom.xml index 43455b8d31..368c4c8d7e 100644 --- a/pom.xml +++ b/pom.xml @@ -227,6 +227,21 @@ language governing permissions and limitations under the License. --> swagger-annotations 1.5.3-M1 + + org.apache.ignite + ignite-core + 1.6.0 + + + org.apache.ignite + ignite-spring + 1.6.0 + + + org.apache.ignite + ignite-log4j2 + 1.6.0 + commons-codec commons-codec @@ -969,6 +984,12 @@ language governing permissions and limitations under the License. --> 1.0.0-SNAPSHOT nar + + org.apache.nifi + nifi-ignite-nar + 1.0.0-SNAPSHOT + nar + org.apache.nifi nifi-http-context-map-nar