diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 0bf722b121..e40fd7b96a 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -267,6 +267,11 @@ language governing permissions and limitations under the License. --> nifi-scripting-nar nar + + org.apache.nifi + nifi-elasticsearch-nar + nar + diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/pom.xml index 44492a5f55..a5002ddde4 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/pom.xml @@ -6,10 +6,10 @@ nifi-elasticsearch-bundle org.apache.nifi - 0.4.1-SNAPSHOT + 0.4.2-SNAPSHOT - gov.pnnl.nifi + org.apache.nifi nifi-elasticsearch-nar nar @@ -17,7 +17,7 @@ org.apache.nifi nifi-elasticsearch-processors - 0.4.1-SNAPSHOT + ${project.version} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..f8e3415d65 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,212 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + +The binary distribution of this product bundles 'Woodstox StAX 2 API' which is + "licensed under standard BSD license" diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/NOTICE.txt b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/NOTICE similarity index 100% rename from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/NOTICE.txt rename to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-nar/src/main/resources/META-INF/NOTICE diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml index 3208ced1d3..3b5e6005e3 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml @@ -6,7 +6,7 @@ nifi-elasticsearch-bundle org.apache.nifi - 0.4.1-SNAPSHOT + 0.4.2-SNAPSHOT org.apache.nifi @@ -14,7 +14,7 @@ 1.7.12 - 1.7.1 + 2.1.0 2.4 2.9.1 @@ -28,6 +28,11 @@ org.apache.nifi nifi-processor-utils + + org.apache.lucene + lucene-core + ${lucene.version} + org.apache.nifi nifi-mock diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticSearchProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticSearchProcessor.java deleted file mode 100644 index 6bfd37f44b..0000000000 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticSearchProcessor.java +++ /dev/null @@ -1,232 +0,0 @@ -package org.apache.nifi.processors.elasticsearch; - -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.util.StandardValidators; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Locale; - -import org.joda.time.DateTime; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; - -import com.google.gson.*; - -public abstract class AbstractElasticSearchProcessor extends AbstractProcessor{ - - protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder() - .name("Cluster Name") - .description("Name of the ES cluster. For example, elasticsearch_brew") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder() - .name("ElasticSearch Hosts") - .description("ElasticSearch Hosts, which should be comma separated and colon for hostname/port " + - "host1:port,host2:port,.... For example testcluster:9300") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder() - .name("ElasticSearch Ping Timeout") - .description("The ping timeout used to determine when a node is unreachable. " + - "For example, 5s (5 seconds). If non-local recommended is 30s") - .required(true) - .defaultValue("5s") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder() - .name("Sampler Interval") - .description("Node sampler interval. For example, 5s (5 seconds) If non-local recommended is 30s") - .required(true) - .defaultValue("5s") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - protected static final PropertyDescriptor INDEX_STRATEGY = new PropertyDescriptor.Builder() - .name("Index Strategy") - .description("Pick the index strategy. Yearly, Monthly, Daily, Hourly") - .required(true) - .defaultValue("Monthly") - .allowableValues("Yearly", "Monthly", "Daily", "Hourly") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - protected TransportClient esClient; - protected List esHosts; - protected String indexPrefix; - protected static String indexStrategy; - - /** - * Instantiate ElasticSearch Client - * @param context - * @throws IOException - */ - @OnScheduled - public final void createClient(ProcessContext context) throws IOException { - if (esClient != null) { - closeClient(); - } - - getLogger().info("Creating ElasticSearch Client"); - - try { - - final String clusterName = context.getProperty(CLUSTER_NAME).toString(); - final String pingTimeout = context.getProperty(PING_TIMEOUT).toString(); - final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).toString(); - indexStrategy = context.getProperty(INDEX_STRATEGY).toString(); - - //create new transport client - esClient = new TransportClient( - ImmutableSettings.builder() - .put("cluster.name", clusterName) - .put("client.transport.ping_timeout", pingTimeout) - .put("client.transport.nodes_sampler_interval", samplerInterval), - false); - - - final String hosts = context.getProperty(HOSTS).toString(); - esHosts = GetEsHosts(hosts); - - for (final InetSocketAddress host : esHosts) { - esClient.addTransportAddress(new InetSocketTransportAddress(host)); - } - } catch (Exception e) { - getLogger().error("Failed to schedule PutElasticSearch due to {}", new Object[] { e }, e); - throw e; - } - } - - /** - * Dispose of ElasticSearch client - */ - @OnStopped - public final void closeClient() { - if (esClient != null) { - getLogger().info("Closing ElasticSearch Client"); - esClient.close(); - esClient = null; - } - } - - /** - * Get the ElasticSearch hosts from the Nifi attribute - * @param hosts A comma separted list of ElasticSearch hosts - * @return List of InetSockeAddresses for the ES hosts - */ - private List GetEsHosts(String hosts){ - - final List esList = Arrays.asList(hosts.split(",")); - List esHosts = new ArrayList<>(); - - for(String item : esList){ - - String[] addresses = item.split(":"); - final String hostName = addresses[0]; - final int port = Integer.parseInt(addresses[1]); - - esHosts.add(new InetSocketAddress(hostName, port)); - } - - return esHosts; - - } - - - /** - * Get ElasticSearch index for data - * @param input - * @return - */ - public String getIndex(final JsonObject input) { - - return extractIndexString(input); - } - - /** - * Get ElasticSearch Type - * @param input - * @return - */ - public String getType(final JsonObject input) { - return "status"; - } - - /** - * Get id for ElasticSearch - * @param input - * @return - */ - public String getId(final JsonObject input) { - - return input.get("id").getAsString(); - } - - /** - * Get Source for ElasticSearch - * @param input - * @return - */ - public byte[] getSource(final JsonObject input) { - String jsonString = input.toString(); - jsonString = jsonString.replace("\r\n", " ").replace('\n', ' ').replace('\r', ' '); - return jsonString.getBytes(StandardCharsets.UTF_8); - } - - /** - * Identify ElasticSearch index where data will land - * @param parsedJson - * @return - */ - private static String extractIndexString(final JsonObject parsedJson) { - final String extractedDate = "created_at"; - if(!parsedJson.has(extractedDate)) - throw new IllegalArgumentException("Message is missing " + extractedDate); - - final DateTimeFormatter format = - DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy").withLocale(Locale.ENGLISH); - - final String dateElement = parsedJson.get(extractedDate).getAsString(); - final DateTimeFormatter isoFormat = ISODateTimeFormat.dateTime(); - final DateTime dateTime = isoFormat.parseDateTime(format.parseDateTime(dateElement).toString()); - - final DateTimeFormatter dateFormat; - //Create ElasticSearch Index - switch (indexStrategy){ - - case "Yearly": - dateFormat = DateTimeFormat.forPattern("yyyy_MM"); - break; - case "Monthly": - dateFormat = DateTimeFormat.forPattern("yyyy_MM"); - break; - case "Daily": - dateFormat = DateTimeFormat.forPattern("yyyy_MM_dd"); - break; - case "Hourly": - dateFormat = DateTimeFormat.forPattern("yyyy_MM_dd_HH"); - break; - default: - throw new IllegalArgumentException("Invalid index strategy selected: " + indexStrategy); - - } - - //ElasticSearch indexes must be lowercase - final String strategy = indexStrategy.toLowerCase() + "_" + dateFormat.print(dateTime); - return strategy; - } -} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java new file mode 100644 index 0000000000..95731781f2 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.google.gson.JsonObject; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.elasticsearch.node.NodeBuilder; + +public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { + + protected static final AllowableValue TRANSPORT_CLIENT = + new AllowableValue("transport", "Transport", + "Specifies a Transport Client be used to connect to the Elasticsearch cluster. A Transport " + + "client does not join the cluster, and is better for a large number of connections " + + "and/or if the NiFi node(s) and Elasticsearch nodes are mostly isolated via firewall."); + + protected static final AllowableValue NODE_CLIENT = + new AllowableValue("node", "Node", + "Specifies a Node Client be used to connect to the Elasticsearch cluster. This client joins the " + + "cluster, so operations are performed more quickly, but the NiFi node may need to be " + + "configured such that it can successfully join the Elasticsearch cluster"); + + protected static final PropertyDescriptor CLIENT_TYPE = new PropertyDescriptor.Builder() + .name("Client type") + .description("The type of client used to connect to the Elasticsearch cluster. Transport client is more " + + "isolated and lighter-weight, and Node client is faster and more integrated into the ES cluster") + .required(true) + .allowableValues(TRANSPORT_CLIENT, NODE_CLIENT) + .defaultValue(TRANSPORT_CLIENT.getValue()) + .addValidator(Validator.VALID) + .build(); + + protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder() + .name("Cluster Name") + .description("Name of the ES cluster (for example, elasticsearch_brew). Defaults to 'elasticsearch'") + .required(false) + .addValidator(Validator.VALID) + .build(); + protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder() + .name("ElasticSearch Hosts") + .description("ElasticSearch Hosts, which should be comma separated and colon for hostname/port " + + "host1:port,host2:port,.... For example testcluster:9300. Note that this property is only " + + "needed when using a Transport client, it is ignored when using a Node client") + .required(false) + .addValidator(new ElasticsearchClientValidator()) + .build(); + protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder() + .name("ElasticSearch Ping Timeout") + .description("The ping timeout used to determine when a node is unreachable. " + + "For example, 5s (5 seconds). If non-local recommended is 30s") + .required(true) + .defaultValue("5s") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder() + .name("Sampler Interval") + .description("Node sampler interval. For example, 5s (5 seconds) If non-local recommended is 30s") + .required(true) + .defaultValue("5s") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + + protected Client esClient; + protected List esHosts; + + /** + * Instantiate ElasticSearch Client + * + * @param context + * @throws IOException + */ + @OnScheduled + public void createClient(ProcessContext context) throws IOException { + + ProcessorLog log = getLogger(); + if (esClient != null) { + closeClient(); + } + + log.info("Creating ElasticSearch Client"); + + try { + final String clusterType = context.getProperty(CLIENT_TYPE).toString(); + final String clusterName = context.getProperty(CLUSTER_NAME).toString(); + final String pingTimeout = context.getProperty(PING_TIMEOUT).toString(); + final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).toString(); + + if ("transport".equals(clusterType)) { + + //create new transport client + Settings settings = Settings.settingsBuilder() + .put("cluster.name", clusterName) + .put("client.transport.ping_timeout", pingTimeout) + .put("client.transport.nodes_sampler_interval", samplerInterval) + .build(); + + TransportClient transportClient = TransportClient.builder().settings(settings).build(); + + final String hosts = context.getProperty(HOSTS).toString(); + esHosts = GetEsHosts(hosts); + + if (esHosts != null) { + for (final InetSocketAddress host : esHosts) { + transportClient.addTransportAddress(new InetSocketTransportAddress(host)); + } + } + esClient = transportClient; + } else if ("node".equals(clusterType)) { + esClient = NodeBuilder.nodeBuilder().clusterName(clusterName).node().client(); + } + } catch (Exception e) { + log.error("Failed to create Elasticsearch client due to {}", new Object[]{e}, e); + throw e; + } + } + + /** + * Dispose of ElasticSearch client + */ + @OnStopped + public final void closeClient() { + if (esClient != null) { + getLogger().info("Closing ElasticSearch Client"); + esClient.close(); + esClient = null; + } + } + + /** + * Get the ElasticSearch hosts from a Nifi attribute, e.g. + * + * @param hosts A comma-separated list of ElasticSearch hosts (host:port,host2:port2, etc.) + * @return List of InetSocketAddresses for the ES hosts + */ + private List GetEsHosts(String hosts) { + + if (hosts == null) { + return null; + } + final List esList = Arrays.asList(hosts.split(",")); + List esHosts = new ArrayList<>(); + + for (String item : esList) { + + String[] addresses = item.split(":"); + final String hostName = addresses[0]; + final int port = Integer.parseInt(addresses[1]); + + esHosts.add(new InetSocketAddress(hostName, port)); + } + + return esHosts; + + } + + /** + * Get Source for ElasticSearch. The string representation of the JSON object is returned as a byte array after + * replacing newlines with spaces + * + * @param input a JSON object to be serialized to UTF-8 + * @return a byte array containing the UTF-8 representation (without newlines) of the JSON object + */ + public byte[] getSource(final JsonObject input) { + String jsonString = input.toString(); + jsonString = jsonString.replace("\r\n", " ").replace('\n', ' ').replace('\r', ' '); + return jsonString.getBytes(StandardCharsets.UTF_8); + } + + /** + * A custom validator for the Elasticsearch properties list. For example, the hostnames property doesn't need to + * be filled in for a Node client, as it joins the cluster by name. Alternatively if a Transport client + */ + protected static class ElasticsearchClientValidator implements Validator { + + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + // Only validate hosts if cluster type == Transport + if (HOSTS.getName().equals(subject)) { + PropertyValue clientTypeProperty = context.getProperty(CLIENT_TYPE); + if (TRANSPORT_CLIENT.getValue().equals(clientTypeProperty.getValue())) { + return StandardValidators.NON_EMPTY_VALIDATOR.validate( + CLIENT_TYPE.getName(), clientTypeProperty.getValue(), context); + } + } + return VALID.validate(subject, input, context); + } + } + +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticSearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java similarity index 54% rename from nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticSearch.java rename to nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java index 9740d39dd5..003aa30005 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticSearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java @@ -1,9 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.processors.elasticsearch; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.ProcessContext; @@ -12,38 +31,67 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.elasticsearch.AbstractElasticSearchProcessor; -import org.apache.nifi.stream.io.StreamUtils; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.client.transport.NoNodeAvailableException; -import org.elasticsearch.*; import org.elasticsearch.transport.ReceiveTimeoutTransportException; import java.io.IOException; import java.io.InputStream; -import java.util.*; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; -import com.google.gson.*; @EventDriven @Tags({"elasticsearch", "insert", "update", "write", "put"}) -@CapabilityDescription("Writes the contents of a FlowFile to ElasticSearch") -public class PutElasticSearch extends AbstractElasticSearchProcessor { +@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch") +public class PutElasticsearch extends AbstractElasticsearchProcessor { static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") - .description("All FlowFiles that are written to ElasticSearch are routed to this relationship").build(); + .description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") - .description("All FlowFiles that cannot be written to ElasticSearch are routed to this relationship").build(); + .description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); static final Relationship REL_RETRY = new Relationship.Builder() .name("retry") .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") .build(); + public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("Identifier attribute") + .description("The name of the attribute containing the identifier for each FlowFile") + .required(true) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("Index") + .description("The name of the index to insert into") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( + AttributeExpression.ResultType.STRING, true)) + .build(); + + public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() + .name("Type") + .description("The type of this document (used by Elasticsearch for indexing and searching)") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( + AttributeExpression.ResultType.STRING, true)) + .build(); + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() .name("Batch Size") .description("The preferred number of FlowFiles to put to the database in a single transaction") @@ -52,40 +100,40 @@ public class PutElasticSearch extends AbstractElasticSearchProcessor { .defaultValue("100") .build(); - private final List descriptors; - - private final Set relationships; - - public PutElasticSearch() { - final List descriptors = new ArrayList<>(); - descriptors.add(CLUSTER_NAME); - descriptors.add(HOSTS); - descriptors.add(PING_TIMEOUT); - descriptors.add(SAMPLER_INTERVAL); - descriptors.add(BATCH_SIZE); - descriptors.add(INDEX_STRATEGY); - this.descriptors = Collections.unmodifiableList(descriptors); + @Override + public Set getRelationships() { final Set relationships = new HashSet<>(); relationships.add(REL_SUCCESS); relationships.add(REL_FAILURE); relationships.add(REL_RETRY); - this.relationships = Collections.unmodifiableSet(relationships); - } - - @Override - public Set getRelationships() { - return this.relationships; + return Collections.unmodifiableSet(relationships); } @Override public final List getSupportedPropertyDescriptors() { - return descriptors; + final List descriptors = new ArrayList<>(); + descriptors.add(CLIENT_TYPE); + descriptors.add(CLUSTER_NAME); + descriptors.add(HOSTS); + descriptors.add(PING_TIMEOUT); + descriptors.add(SAMPLER_INTERVAL); + descriptors.add(ID_ATTRIBUTE); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(BATCH_SIZE); + + return Collections.unmodifiableList(descriptors); } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final String index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue(); + final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue(); + final String docType = context.getProperty(TYPE).getValue(); + final List flowFiles = session.get(batchSize); if (flowFiles.isEmpty()) { return; @@ -94,13 +142,32 @@ public class PutElasticSearch extends AbstractElasticSearchProcessor { final ProcessorLog logger = getLogger(); try { - final BulkRequestBuilder bulk = GetEsBulkRequest(session, flowFiles); + final BulkRequestBuilder bulk = esClient.prepareBulk(); + for (FlowFile file : flowFiles) { + final String id = file.getAttribute(id_attribute); + if (id == null) { + getLogger().error("no value in identifier attribute {}", new Object[]{id_attribute}); + throw new ProcessException("No value in identifier attribute " + id_attribute); + } + session.read(file, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + + final InputStreamReader input = new InputStreamReader(in); + final JsonParser parser = new JsonParser(); + final JsonObject json = parser.parse(input).getAsJsonObject(); + bulk.add(esClient.prepareIndex(index, docType, id) + .setSource(getSource(json))); + } + }); + } + final BulkResponse response = bulk.execute().actionGet(); if (response.hasFailures()) { for (final BulkItemResponse item : response.getItems()) { final FlowFile flowFile = flowFiles.get(item.getItemId()); if (item.isFailed()) { - logger.error("Failed to insert {} into ElasticSearch due to {}", + logger.error("Failed to insert {} into Elasticsearch due to {}", new Object[]{flowFile, item.getFailure()}, new Exception()); session.transfer(flowFile, REL_FAILURE); @@ -117,36 +184,28 @@ public class PutElasticSearch extends AbstractElasticSearchProcessor { } catch (NoNodeAvailableException nne) { - logger.error("Failed to insert {} into ElasticSearch No Node Available {}", new Object[]{nne}, nne); + logger.error("Failed to insert {} into Elasticsearch No Node Available {}", new Object[]{nne}, nne); for (final FlowFile flowFile : flowFiles) { session.transfer(flowFile, REL_RETRY); } context.yield(); } catch (ElasticsearchTimeoutException ete) { - logger.error("Failed to insert {} into ElasticSearch Timeout to {}", new Object[]{ete}, ete); + logger.error("Failed to insert {} into Elasticsearch Timeout to {}", new Object[]{ete}, ete); for (final FlowFile flowFile : flowFiles) { session.transfer(flowFile, REL_RETRY); } context.yield(); } catch (ReceiveTimeoutTransportException rtt) { - logger.error("Failed to insert {} into ElasticSearch ReceiveTimeoutTransportException to {}", new Object[]{rtt}, rtt); + logger.error("Failed to insert {} into Elasticsearch ReceiveTimeoutTransportException to {}", new Object[]{rtt}, rtt); for (final FlowFile flowFile : flowFiles) { session.transfer(flowFile, REL_RETRY); } context.yield(); } catch (ElasticsearchParseException esp) { - logger.error("Failed to insert {} into ElasticSearch Parse Exception {}", new Object[]{esp}, esp); - - for (final FlowFile flowFile : flowFiles) { - session.transfer(flowFile, REL_FAILURE); - } - context.yield(); - - } catch (ElasticsearchException e) { - logger.error("Failed to insert {} into ElasticSearch due to {}", new Object[]{e}, e); + logger.error("Failed to insert {} into Elasticsearch Parse Exception {}", new Object[]{esp}, esp); for (final FlowFile flowFile : flowFiles) { session.transfer(flowFile, REL_FAILURE); @@ -154,7 +213,7 @@ public class PutElasticSearch extends AbstractElasticSearchProcessor { context.yield(); } catch (Exception e) { - logger.error("Failed to insert {} into ElasticSearch due to {}", new Object[]{e}, e); + logger.error("Failed to insert {} into Elasticsearch due to {}", new Object[]{e}, e); for (final FlowFile flowFile : flowFiles) { session.transfer(flowFile, REL_FAILURE); @@ -163,35 +222,4 @@ public class PutElasticSearch extends AbstractElasticSearchProcessor { } } - - /** - * Get the ES bulk request for the session - * - * @param session ProcessSession - * @param flowFiles Flowfiles pulled off of the queue to batch in - * @return BulkeRequestBuilder - */ - private BulkRequestBuilder GetEsBulkRequest(final ProcessSession session, final List flowFiles) { - - final BulkRequestBuilder bulk = esClient.prepareBulk(); - for (FlowFile file : flowFiles) { - final byte[] content = new byte[(int) file.getSize()]; - session.read(file, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, content, true); - - final String input = new String(content); - final JsonParser parser = new JsonParser(); - final JsonObject json = parser.parse(input).getAsJsonObject(); - bulk.add(esClient.prepareIndex(getIndex(json), getType(json), getId(json)) - .setSource(getSource(json))); - - } - - }); - - } - return bulk; - } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 7c14a424a3..f65af2feb4 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,4 +12,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.elasticsearch.PutElasticSearch +org.apache.nifi.processors.elasticsearch.PutElasticsearch diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticSearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticSearch.java deleted file mode 100644 index c8b7ab65f2..0000000000 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticSearch.java +++ /dev/null @@ -1,103 +0,0 @@ -package org.apache.nifi.processors.elasticsearch; - -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.*; - - -import java.io.IOException; -import java.io.InputStream; - -public class TestPutElasticSearch { - - private InputStream twitterExample; - private TestRunner runner; - - @Before - public void setUp() throws IOException { - ClassLoader classloader = Thread.currentThread().getContextClassLoader(); - twitterExample = classloader - .getResourceAsStream("TweetExample.json"); - - } - - @After - public void teardown() { - runner = null; - - } - - - @Test - @Ignore("Comment this out if you want to run against local or test ES") - public void testPutElasticSearchBasic() throws IOException { - System.out.println("Starting test " + new Object() { - }.getClass().getEnclosingMethod().getName()); - final TestRunner runner = TestRunners.newTestRunner(new PutElasticSearch()); - runner.setValidateExpressionUsage(false); - //Local Cluster - Mac pulled from brew - runner.setProperty(AbstractElasticSearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); - runner.setProperty(AbstractElasticSearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticSearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticSearchProcessor.SAMPLER_INTERVAL, "5s"); - runner.setProperty(AbstractElasticSearchProcessor.INDEX_STRATEGY, "Monthly"); - runner.setProperty(PutElasticSearch.BATCH_SIZE, "1"); - - - runner.enqueue(twitterExample); - runner.run(); - - runner.assertAllFlowFilesTransferred(PutElasticSearch.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticSearch.REL_SUCCESS).get(0); - - - } - - @Test - @Ignore("Comment this out if you want to run against local or test ES") - public void testPutElasticSearchBatch() throws IOException { - System.out.println("Starting test " + new Object() { - }.getClass().getEnclosingMethod().getName()); - final TestRunner runner = TestRunners.newTestRunner(new PutElasticSearch()); - runner.setValidateExpressionUsage(false); - //Local Cluster - Mac pulled from brew - runner.setProperty(AbstractElasticSearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); - runner.setProperty(AbstractElasticSearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticSearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticSearchProcessor.SAMPLER_INTERVAL, "5s"); - runner.setProperty(AbstractElasticSearchProcessor.INDEX_STRATEGY, "Monthly"); - runner.setProperty(PutElasticSearch.BATCH_SIZE, "100"); - - JsonParser parser = new JsonParser(); - JsonObject json; - String message = convertStreamToString(twitterExample); - for (int i = 0;i < 100; i++){ - - json = parser.parse(message).getAsJsonObject(); - String id = json.get("id").getAsString(); - long newId = Long.parseLong(id) + i; - json.addProperty("id", newId); - runner.enqueue(message.getBytes()); - - } - - runner.run(); - - runner.assertAllFlowFilesTransferred(PutElasticSearch.REL_SUCCESS, 100); - final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticSearch.REL_SUCCESS).get(0); - - } - - /** - * Convert an input stream to a stream - * @param is input the input stream - * @return return the converted input stream as a string - */ - static String convertStreamToString(java.io.InputStream is) { - java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A"); - return s.hasNext() ? s.next() : ""; - } -} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java new file mode 100644 index 0000000000..68a0a78e89 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessContext; +import org.apache.nifi.util.MockProcessorInitializationContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexAction; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.AbstractListenableActionFuture; +import org.elasticsearch.action.support.AdapterActionFuture; +import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class TestPutElasticsearch { + + private InputStream twitterExample; + private TestRunner runner; + + @Before + public void setUp() throws IOException { + ClassLoader classloader = Thread.currentThread().getContextClassLoader(); + twitterExample = classloader + .getResourceAsStream("TweetExample.json"); + + } + + @After + public void teardown() { + runner = null; + } + + @Test + public void testPutElasticSearchOnTrigger() throws IOException { + runner = TestRunners.newTestRunner(new ElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(false); + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + + runner.setProperty(PutElasticsearch.INDEX, "tweet"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearch.TYPE, "status"); + runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id"); + runner.assertValid(); + + runner.enqueue(twitterExample, new HashMap() {{ + put("tweet_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("tweet_id", "28039652140"); + } + + @Test + public void testPutElasticSearchOnTriggerWithFailures() throws IOException { + runner = TestRunners.newTestRunner(new ElasticsearchTestProcessor(true)); // simulate failures + runner.setValidateExpressionUsage(false); + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(PutElasticsearch.INDEX, "tweet"); + runner.setProperty(PutElasticsearch.TYPE, "status"); + runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id"); + + runner.enqueue(twitterExample, new HashMap() {{ + put("tweet_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0); + assertNotNull(out); + out.assertAttributeEquals("tweet_id", "28039652140"); + } + + /** + * A Test class that extends the processor in order to inject/mock behavior + */ + private static class ElasticsearchTestProcessor extends PutElasticsearch { + boolean responseHasFailures = false; + + public ElasticsearchTestProcessor(boolean responseHasFailures) { + this.responseHasFailures = responseHasFailures; + } + + @Override + @OnScheduled + public void createClient(ProcessContext context) throws IOException { + esClient = mock(Client.class); + BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(esClient, BulkAction.INSTANCE)); + doReturn(new MockBulkRequestBuilderExecutor(responseHasFailures)).when(bulkRequestBuilder).execute(); + when(esClient.prepareBulk()).thenReturn(bulkRequestBuilder); + + IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(esClient, IndexAction.INSTANCE); + when(esClient.prepareIndex(anyString(), anyString(), anyString())).thenReturn(indexRequestBuilder); + } + + private static class MockBulkRequestBuilderExecutor + extends AdapterActionFuture> + implements ListenableActionFuture { + + boolean responseHasFailures = false; + + public MockBulkRequestBuilderExecutor(boolean responseHasFailures) { + this.responseHasFailures = responseHasFailures; + } + + @Override + protected BulkResponse convert(ActionListener bulkResponseActionListener) { + return null; + } + + @Override + public void addListener(ActionListener actionListener) { + + } + + @Override + public BulkResponse get() throws InterruptedException, ExecutionException { + BulkResponse response = mock(BulkResponse.class); + when(response.hasFailures()).thenReturn(responseHasFailures); + return response; + } + + } + } + + + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Integration test section below + // + // The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution. + // However if you wish to execute them as part of a test phase, comment out the @Ignored line for each + // desired test. + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Tests basic ES functionality against a local or test ES cluster + * @throws IOException + */ + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testPutElasticSearchBasic() throws IOException { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch()); + runner.setValidateExpressionUsage(false); + //Local Cluster - Mac pulled from brew + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(PutElasticsearch.INDEX, "tweet"); + runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); + + + runner.enqueue(twitterExample); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); + + } + + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testPutElasticSearchBatch() throws IOException { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch()); + runner.setValidateExpressionUsage(false); + //Local Cluster - Mac pulled from brew + runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(PutElasticsearch.INDEX, "tweet"); + runner.setProperty(PutElasticsearch.BATCH_SIZE, "100"); + + JsonParser parser = new JsonParser(); + JsonObject json; + String message = convertStreamToString(twitterExample); + for (int i = 0; i < 100; i++) { + + json = parser.parse(message).getAsJsonObject(); + String id = json.get("id").getAsString(); + long newId = Long.parseLong(id) + i; + json.addProperty("id", newId); + runner.enqueue(message.getBytes()); + + } + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 100); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); + + } + + /** + * Convert an input stream to a stream + * + * @param is input the input stream + * @return return the converted input stream as a string + */ + static String convertStreamToString(java.io.InputStream is) { + java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A"); + return s.hasNext() ? s.next() : ""; + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml index 9945c3eac8..8255487021 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml @@ -7,12 +7,17 @@ org.apache.nifi nifi-nar-bundles - 0.4.1-SNAPSHOT + 0.4.2-SNAPSHOT org.apache.nifi nifi-elasticsearch-bundle pom + + + 5.3.1 + + nifi-elasticsearch-nar nifi-elasticsearch-processors diff --git a/pom.xml b/pom.xml index 563ecc3f92..7eb353271f 100644 --- a/pom.xml +++ b/pom.xml @@ -1032,6 +1032,12 @@ language governing permissions and limitations under the License. --> 0.4.2-SNAPSHOT nar + + org.apache.nifi + nifi-elasticsearch-nar + 0.4.2-SNAPSHOT + nar + org.apache.nifi nifi-properties @@ -1584,4 +1590,4 @@ language governing permissions and limitations under the License. --> - \ No newline at end of file +