Adding to PutElasticsearch - validators, unit tests, etc.

This commit is contained in:
Matt Burgess 2015-12-29 13:43:54 -05:00
parent e6cfcf40d0
commit 943d0a6e53
13 changed files with 833 additions and 418 deletions

View File

@ -267,6 +267,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-scripting-nar</artifactId> <artifactId>nifi-scripting-nar</artifactId>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-elasticsearch-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies> </dependencies>
<properties> <properties>

View File

@ -6,10 +6,10 @@
<parent> <parent>
<artifactId>nifi-elasticsearch-bundle</artifactId> <artifactId>nifi-elasticsearch-bundle</artifactId>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<version>0.4.1-SNAPSHOT</version> <version>0.4.2-SNAPSHOT</version>
</parent> </parent>
<groupId>gov.pnnl.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-elasticsearch-nar</artifactId> <artifactId>nifi-elasticsearch-nar</artifactId>
<packaging>nar</packaging> <packaging>nar</packaging>
@ -17,7 +17,7 @@
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-elasticsearch-processors</artifactId> <artifactId>nifi-elasticsearch-processors</artifactId>
<version>0.4.1-SNAPSHOT</version> <version>${project.version}</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -0,0 +1,212 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.
The binary distribution of this product bundles 'Woodstox StAX 2 API' which is
"licensed under standard BSD license"

View File

@ -6,7 +6,7 @@
<parent> <parent>
<artifactId>nifi-elasticsearch-bundle</artifactId> <artifactId>nifi-elasticsearch-bundle</artifactId>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<version>0.4.1-SNAPSHOT</version> <version>0.4.2-SNAPSHOT</version>
</parent> </parent>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
@ -14,7 +14,7 @@
<properties> <properties>
<slf4jversion>1.7.12</slf4jversion> <slf4jversion>1.7.12</slf4jversion>
<es.version>1.7.1</es.version> <es.version>2.1.0</es.version>
<gsonversion>2.4</gsonversion> <gsonversion>2.4</gsonversion>
<jodatimeversion>2.9.1</jodatimeversion> <jodatimeversion>2.9.1</jodatimeversion>
</properties> </properties>
@ -28,6 +28,11 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-processor-utils</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>${lucene.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId> <artifactId>nifi-mock</artifactId>

View File

@ -1,232 +0,0 @@
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import com.google.gson.*;
public abstract class AbstractElasticSearchProcessor extends AbstractProcessor{
protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder()
.name("Cluster Name")
.description("Name of the ES cluster. For example, elasticsearch_brew")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder()
.name("ElasticSearch Hosts")
.description("ElasticSearch Hosts, which should be comma separated and colon for hostname/port " +
"host1:port,host2:port,.... For example testcluster:9300")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder()
.name("ElasticSearch Ping Timeout")
.description("The ping timeout used to determine when a node is unreachable. " +
"For example, 5s (5 seconds). If non-local recommended is 30s")
.required(true)
.defaultValue("5s")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder()
.name("Sampler Interval")
.description("Node sampler interval. For example, 5s (5 seconds) If non-local recommended is 30s")
.required(true)
.defaultValue("5s")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor INDEX_STRATEGY = new PropertyDescriptor.Builder()
.name("Index Strategy")
.description("Pick the index strategy. Yearly, Monthly, Daily, Hourly")
.required(true)
.defaultValue("Monthly")
.allowableValues("Yearly", "Monthly", "Daily", "Hourly")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected TransportClient esClient;
protected List<InetSocketAddress> esHosts;
protected String indexPrefix;
protected static String indexStrategy;
/**
* Instantiate ElasticSearch Client
* @param context
* @throws IOException
*/
@OnScheduled
public final void createClient(ProcessContext context) throws IOException {
if (esClient != null) {
closeClient();
}
getLogger().info("Creating ElasticSearch Client");
try {
final String clusterName = context.getProperty(CLUSTER_NAME).toString();
final String pingTimeout = context.getProperty(PING_TIMEOUT).toString();
final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).toString();
indexStrategy = context.getProperty(INDEX_STRATEGY).toString();
//create new transport client
esClient = new TransportClient(
ImmutableSettings.builder()
.put("cluster.name", clusterName)
.put("client.transport.ping_timeout", pingTimeout)
.put("client.transport.nodes_sampler_interval", samplerInterval),
false);
final String hosts = context.getProperty(HOSTS).toString();
esHosts = GetEsHosts(hosts);
for (final InetSocketAddress host : esHosts) {
esClient.addTransportAddress(new InetSocketTransportAddress(host));
}
} catch (Exception e) {
getLogger().error("Failed to schedule PutElasticSearch due to {}", new Object[] { e }, e);
throw e;
}
}
/**
* Dispose of ElasticSearch client
*/
@OnStopped
public final void closeClient() {
if (esClient != null) {
getLogger().info("Closing ElasticSearch Client");
esClient.close();
esClient = null;
}
}
/**
* Get the ElasticSearch hosts from the Nifi attribute
* @param hosts A comma separted list of ElasticSearch hosts
* @return List of InetSockeAddresses for the ES hosts
*/
private List<InetSocketAddress> GetEsHosts(String hosts){
final List<String> esList = Arrays.asList(hosts.split(","));
List<InetSocketAddress> esHosts = new ArrayList<>();
for(String item : esList){
String[] addresses = item.split(":");
final String hostName = addresses[0];
final int port = Integer.parseInt(addresses[1]);
esHosts.add(new InetSocketAddress(hostName, port));
}
return esHosts;
}
/**
* Get ElasticSearch index for data
* @param input
* @return
*/
public String getIndex(final JsonObject input) {
return extractIndexString(input);
}
/**
* Get ElasticSearch Type
* @param input
* @return
*/
public String getType(final JsonObject input) {
return "status";
}
/**
* Get id for ElasticSearch
* @param input
* @return
*/
public String getId(final JsonObject input) {
return input.get("id").getAsString();
}
/**
* Get Source for ElasticSearch
* @param input
* @return
*/
public byte[] getSource(final JsonObject input) {
String jsonString = input.toString();
jsonString = jsonString.replace("\r\n", " ").replace('\n', ' ').replace('\r', ' ');
return jsonString.getBytes(StandardCharsets.UTF_8);
}
/**
* Identify ElasticSearch index where data will land
* @param parsedJson
* @return
*/
private static String extractIndexString(final JsonObject parsedJson) {
final String extractedDate = "created_at";
if(!parsedJson.has(extractedDate))
throw new IllegalArgumentException("Message is missing " + extractedDate);
final DateTimeFormatter format =
DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy").withLocale(Locale.ENGLISH);
final String dateElement = parsedJson.get(extractedDate).getAsString();
final DateTimeFormatter isoFormat = ISODateTimeFormat.dateTime();
final DateTime dateTime = isoFormat.parseDateTime(format.parseDateTime(dateElement).toString());
final DateTimeFormatter dateFormat;
//Create ElasticSearch Index
switch (indexStrategy){
case "Yearly":
dateFormat = DateTimeFormat.forPattern("yyyy_MM");
break;
case "Monthly":
dateFormat = DateTimeFormat.forPattern("yyyy_MM");
break;
case "Daily":
dateFormat = DateTimeFormat.forPattern("yyyy_MM_dd");
break;
case "Hourly":
dateFormat = DateTimeFormat.forPattern("yyyy_MM_dd_HH");
break;
default:
throw new IllegalArgumentException("Invalid index strategy selected: " + indexStrategy);
}
//ElasticSearch indexes must be lowercase
final String strategy = indexStrategy.toLowerCase() + "_" + dateFormat.print(dateTime);
return strategy;
}
}

View File

@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import com.google.gson.JsonObject;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.elasticsearch.node.NodeBuilder;
public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
protected static final AllowableValue TRANSPORT_CLIENT =
new AllowableValue("transport", "Transport",
"Specifies a Transport Client be used to connect to the Elasticsearch cluster. A Transport "
+ "client does not join the cluster, and is better for a large number of connections "
+ "and/or if the NiFi node(s) and Elasticsearch nodes are mostly isolated via firewall.");
protected static final AllowableValue NODE_CLIENT =
new AllowableValue("node", "Node",
"Specifies a Node Client be used to connect to the Elasticsearch cluster. This client joins the "
+ "cluster, so operations are performed more quickly, but the NiFi node may need to be "
+ "configured such that it can successfully join the Elasticsearch cluster");
protected static final PropertyDescriptor CLIENT_TYPE = new PropertyDescriptor.Builder()
.name("Client type")
.description("The type of client used to connect to the Elasticsearch cluster. Transport client is more "
+ "isolated and lighter-weight, and Node client is faster and more integrated into the ES cluster")
.required(true)
.allowableValues(TRANSPORT_CLIENT, NODE_CLIENT)
.defaultValue(TRANSPORT_CLIENT.getValue())
.addValidator(Validator.VALID)
.build();
protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder()
.name("Cluster Name")
.description("Name of the ES cluster (for example, elasticsearch_brew). Defaults to 'elasticsearch'")
.required(false)
.addValidator(Validator.VALID)
.build();
protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder()
.name("ElasticSearch Hosts")
.description("ElasticSearch Hosts, which should be comma separated and colon for hostname/port "
+ "host1:port,host2:port,.... For example testcluster:9300. Note that this property is only "
+ "needed when using a Transport client, it is ignored when using a Node client")
.required(false)
.addValidator(new ElasticsearchClientValidator())
.build();
protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder()
.name("ElasticSearch Ping Timeout")
.description("The ping timeout used to determine when a node is unreachable. " +
"For example, 5s (5 seconds). If non-local recommended is 30s")
.required(true)
.defaultValue("5s")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder()
.name("Sampler Interval")
.description("Node sampler interval. For example, 5s (5 seconds) If non-local recommended is 30s")
.required(true)
.defaultValue("5s")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected Client esClient;
protected List<InetSocketAddress> esHosts;
/**
* Instantiate ElasticSearch Client
*
* @param context
* @throws IOException
*/
@OnScheduled
public void createClient(ProcessContext context) throws IOException {
ProcessorLog log = getLogger();
if (esClient != null) {
closeClient();
}
log.info("Creating ElasticSearch Client");
try {
final String clusterType = context.getProperty(CLIENT_TYPE).toString();
final String clusterName = context.getProperty(CLUSTER_NAME).toString();
final String pingTimeout = context.getProperty(PING_TIMEOUT).toString();
final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).toString();
if ("transport".equals(clusterType)) {
//create new transport client
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clusterName)
.put("client.transport.ping_timeout", pingTimeout)
.put("client.transport.nodes_sampler_interval", samplerInterval)
.build();
TransportClient transportClient = TransportClient.builder().settings(settings).build();
final String hosts = context.getProperty(HOSTS).toString();
esHosts = GetEsHosts(hosts);
if (esHosts != null) {
for (final InetSocketAddress host : esHosts) {
transportClient.addTransportAddress(new InetSocketTransportAddress(host));
}
}
esClient = transportClient;
} else if ("node".equals(clusterType)) {
esClient = NodeBuilder.nodeBuilder().clusterName(clusterName).node().client();
}
} catch (Exception e) {
log.error("Failed to create Elasticsearch client due to {}", new Object[]{e}, e);
throw e;
}
}
/**
* Dispose of ElasticSearch client
*/
@OnStopped
public final void closeClient() {
if (esClient != null) {
getLogger().info("Closing ElasticSearch Client");
esClient.close();
esClient = null;
}
}
/**
* Get the ElasticSearch hosts from a Nifi attribute, e.g.
*
* @param hosts A comma-separated list of ElasticSearch hosts (host:port,host2:port2, etc.)
* @return List of InetSocketAddresses for the ES hosts
*/
private List<InetSocketAddress> GetEsHosts(String hosts) {
if (hosts == null) {
return null;
}
final List<String> esList = Arrays.asList(hosts.split(","));
List<InetSocketAddress> esHosts = new ArrayList<>();
for (String item : esList) {
String[] addresses = item.split(":");
final String hostName = addresses[0];
final int port = Integer.parseInt(addresses[1]);
esHosts.add(new InetSocketAddress(hostName, port));
}
return esHosts;
}
/**
* Get Source for ElasticSearch. The string representation of the JSON object is returned as a byte array after
* replacing newlines with spaces
*
* @param input a JSON object to be serialized to UTF-8
* @return a byte array containing the UTF-8 representation (without newlines) of the JSON object
*/
public byte[] getSource(final JsonObject input) {
String jsonString = input.toString();
jsonString = jsonString.replace("\r\n", " ").replace('\n', ' ').replace('\r', ' ');
return jsonString.getBytes(StandardCharsets.UTF_8);
}
/**
* A custom validator for the Elasticsearch properties list. For example, the hostnames property doesn't need to
* be filled in for a Node client, as it joins the cluster by name. Alternatively if a Transport client
*/
protected static class ElasticsearchClientValidator implements Validator {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
// Only validate hosts if cluster type == Transport
if (HOSTS.getName().equals(subject)) {
PropertyValue clientTypeProperty = context.getProperty(CLIENT_TYPE);
if (TRANSPORT_CLIENT.getValue().equals(clientTypeProperty.getValue())) {
return StandardValidators.NON_EMPTY_VALIDATOR.validate(
CLIENT_TYPE.getName(), clientTypeProperty.getValue(), context);
}
}
return VALID.validate(subject, input, context);
}
}
}

View File

@ -1,9 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch; package org.apache.nifi.processors.elasticsearch;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -12,38 +31,67 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.AbstractElasticSearchProcessor; import org.elasticsearch.ElasticsearchParseException;
import org.apache.nifi.stream.io.StreamUtils; import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.*;
import org.elasticsearch.transport.ReceiveTimeoutTransportException; import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.*; import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import com.google.gson.*;
@EventDriven @EventDriven
@Tags({"elasticsearch", "insert", "update", "write", "put"}) @Tags({"elasticsearch", "insert", "update", "write", "put"})
@CapabilityDescription("Writes the contents of a FlowFile to ElasticSearch") @CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch")
public class PutElasticSearch extends AbstractElasticSearchProcessor { public class PutElasticsearch extends AbstractElasticsearchProcessor {
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFiles that are written to ElasticSearch are routed to this relationship").build(); .description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build();
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to ElasticSearch are routed to this relationship").build(); .description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build();
static final Relationship REL_RETRY = new Relationship.Builder() static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry") .name("retry")
.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
.build(); .build();
public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Identifier attribute")
.description("The name of the attribute containing the identifier for each FlowFile")
.required(true)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
.name("Index")
.description("The name of the index to insert into")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
AttributeExpression.ResultType.STRING, true))
.build();
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("Type")
.description("The type of this document (used by Elasticsearch for indexing and searching)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
AttributeExpression.ResultType.STRING, true))
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size") .name("Batch Size")
.description("The preferred number of FlowFiles to put to the database in a single transaction") .description("The preferred number of FlowFiles to put to the database in a single transaction")
@ -52,40 +100,40 @@ public class PutElasticSearch extends AbstractElasticSearchProcessor {
.defaultValue("100") .defaultValue("100")
.build(); .build();
private final List<PropertyDescriptor> descriptors;
private final Set<Relationship> relationships;
public PutElasticSearch() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(CLUSTER_NAME);
descriptors.add(HOSTS);
descriptors.add(PING_TIMEOUT);
descriptors.add(SAMPLER_INTERVAL);
descriptors.add(BATCH_SIZE);
descriptors.add(INDEX_STRATEGY);
this.descriptors = Collections.unmodifiableList(descriptors);
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>(); final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS); relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE); relationships.add(REL_FAILURE);
relationships.add(REL_RETRY); relationships.add(REL_RETRY);
this.relationships = Collections.unmodifiableSet(relationships); return Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
} }
@Override @Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors; final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(CLIENT_TYPE);
descriptors.add(CLUSTER_NAME);
descriptors.add(HOSTS);
descriptors.add(PING_TIMEOUT);
descriptors.add(SAMPLER_INTERVAL);
descriptors.add(ID_ATTRIBUTE);
descriptors.add(INDEX);
descriptors.add(TYPE);
descriptors.add(BATCH_SIZE);
return Collections.unmodifiableList(descriptors);
} }
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final String index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue();
final String docType = context.getProperty(TYPE).getValue();
final List<FlowFile> flowFiles = session.get(batchSize); final List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles.isEmpty()) { if (flowFiles.isEmpty()) {
return; return;
@ -94,13 +142,32 @@ public class PutElasticSearch extends AbstractElasticSearchProcessor {
final ProcessorLog logger = getLogger(); final ProcessorLog logger = getLogger();
try { try {
final BulkRequestBuilder bulk = GetEsBulkRequest(session, flowFiles); final BulkRequestBuilder bulk = esClient.prepareBulk();
for (FlowFile file : flowFiles) {
final String id = file.getAttribute(id_attribute);
if (id == null) {
getLogger().error("no value in identifier attribute {}", new Object[]{id_attribute});
throw new ProcessException("No value in identifier attribute " + id_attribute);
}
session.read(file, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
final InputStreamReader input = new InputStreamReader(in);
final JsonParser parser = new JsonParser();
final JsonObject json = parser.parse(input).getAsJsonObject();
bulk.add(esClient.prepareIndex(index, docType, id)
.setSource(getSource(json)));
}
});
}
final BulkResponse response = bulk.execute().actionGet(); final BulkResponse response = bulk.execute().actionGet();
if (response.hasFailures()) { if (response.hasFailures()) {
for (final BulkItemResponse item : response.getItems()) { for (final BulkItemResponse item : response.getItems()) {
final FlowFile flowFile = flowFiles.get(item.getItemId()); final FlowFile flowFile = flowFiles.get(item.getItemId());
if (item.isFailed()) { if (item.isFailed()) {
logger.error("Failed to insert {} into ElasticSearch due to {}", logger.error("Failed to insert {} into Elasticsearch due to {}",
new Object[]{flowFile, item.getFailure()}, new Exception()); new Object[]{flowFile, item.getFailure()}, new Exception());
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
@ -117,36 +184,28 @@ public class PutElasticSearch extends AbstractElasticSearchProcessor {
} catch (NoNodeAvailableException nne) { } catch (NoNodeAvailableException nne) {
logger.error("Failed to insert {} into ElasticSearch No Node Available {}", new Object[]{nne}, nne); logger.error("Failed to insert {} into Elasticsearch No Node Available {}", new Object[]{nne}, nne);
for (final FlowFile flowFile : flowFiles) { for (final FlowFile flowFile : flowFiles) {
session.transfer(flowFile, REL_RETRY); session.transfer(flowFile, REL_RETRY);
} }
context.yield(); context.yield();
} catch (ElasticsearchTimeoutException ete) { } catch (ElasticsearchTimeoutException ete) {
logger.error("Failed to insert {} into ElasticSearch Timeout to {}", new Object[]{ete}, ete); logger.error("Failed to insert {} into Elasticsearch Timeout to {}", new Object[]{ete}, ete);
for (final FlowFile flowFile : flowFiles) { for (final FlowFile flowFile : flowFiles) {
session.transfer(flowFile, REL_RETRY); session.transfer(flowFile, REL_RETRY);
} }
context.yield(); context.yield();
} catch (ReceiveTimeoutTransportException rtt) { } catch (ReceiveTimeoutTransportException rtt) {
logger.error("Failed to insert {} into ElasticSearch ReceiveTimeoutTransportException to {}", new Object[]{rtt}, rtt); logger.error("Failed to insert {} into Elasticsearch ReceiveTimeoutTransportException to {}", new Object[]{rtt}, rtt);
for (final FlowFile flowFile : flowFiles) { for (final FlowFile flowFile : flowFiles) {
session.transfer(flowFile, REL_RETRY); session.transfer(flowFile, REL_RETRY);
} }
context.yield(); context.yield();
} catch (ElasticsearchParseException esp) { } catch (ElasticsearchParseException esp) {
logger.error("Failed to insert {} into ElasticSearch Parse Exception {}", new Object[]{esp}, esp); logger.error("Failed to insert {} into Elasticsearch Parse Exception {}", new Object[]{esp}, esp);
for (final FlowFile flowFile : flowFiles) {
session.transfer(flowFile, REL_FAILURE);
}
context.yield();
} catch (ElasticsearchException e) {
logger.error("Failed to insert {} into ElasticSearch due to {}", new Object[]{e}, e);
for (final FlowFile flowFile : flowFiles) { for (final FlowFile flowFile : flowFiles) {
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
@ -154,7 +213,7 @@ public class PutElasticSearch extends AbstractElasticSearchProcessor {
context.yield(); context.yield();
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to insert {} into ElasticSearch due to {}", new Object[]{e}, e); logger.error("Failed to insert {} into Elasticsearch due to {}", new Object[]{e}, e);
for (final FlowFile flowFile : flowFiles) { for (final FlowFile flowFile : flowFiles) {
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
@ -163,35 +222,4 @@ public class PutElasticSearch extends AbstractElasticSearchProcessor {
} }
} }
/**
* Get the ES bulk request for the session
*
* @param session ProcessSession
* @param flowFiles Flowfiles pulled off of the queue to batch in
* @return BulkeRequestBuilder
*/
private BulkRequestBuilder GetEsBulkRequest(final ProcessSession session, final List<FlowFile> flowFiles) {
final BulkRequestBuilder bulk = esClient.prepareBulk();
for (FlowFile file : flowFiles) {
final byte[] content = new byte[(int) file.getSize()];
session.read(file, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, content, true);
final String input = new String(content);
final JsonParser parser = new JsonParser();
final JsonObject json = parser.parse(input).getAsJsonObject();
bulk.add(esClient.prepareIndex(getIndex(json), getType(json), getId(json))
.setSource(getSource(json)));
}
});
}
return bulk;
}
} }

View File

@ -12,4 +12,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
org.apache.nifi.processors.elasticsearch.PutElasticSearch org.apache.nifi.processors.elasticsearch.PutElasticsearch

View File

@ -1,103 +0,0 @@
package org.apache.nifi.processors.elasticsearch;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.*;
import java.io.IOException;
import java.io.InputStream;
public class TestPutElasticSearch {
private InputStream twitterExample;
private TestRunner runner;
@Before
public void setUp() throws IOException {
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
twitterExample = classloader
.getResourceAsStream("TweetExample.json");
}
@After
public void teardown() {
runner = null;
}
@Test
@Ignore("Comment this out if you want to run against local or test ES")
public void testPutElasticSearchBasic() throws IOException {
System.out.println("Starting test " + new Object() {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new PutElasticSearch());
runner.setValidateExpressionUsage(false);
//Local Cluster - Mac pulled from brew
runner.setProperty(AbstractElasticSearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
runner.setProperty(AbstractElasticSearchProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticSearchProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticSearchProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(AbstractElasticSearchProcessor.INDEX_STRATEGY, "Monthly");
runner.setProperty(PutElasticSearch.BATCH_SIZE, "1");
runner.enqueue(twitterExample);
runner.run();
runner.assertAllFlowFilesTransferred(PutElasticSearch.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticSearch.REL_SUCCESS).get(0);
}
@Test
@Ignore("Comment this out if you want to run against local or test ES")
public void testPutElasticSearchBatch() throws IOException {
System.out.println("Starting test " + new Object() {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new PutElasticSearch());
runner.setValidateExpressionUsage(false);
//Local Cluster - Mac pulled from brew
runner.setProperty(AbstractElasticSearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
runner.setProperty(AbstractElasticSearchProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticSearchProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticSearchProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(AbstractElasticSearchProcessor.INDEX_STRATEGY, "Monthly");
runner.setProperty(PutElasticSearch.BATCH_SIZE, "100");
JsonParser parser = new JsonParser();
JsonObject json;
String message = convertStreamToString(twitterExample);
for (int i = 0;i < 100; i++){
json = parser.parse(message).getAsJsonObject();
String id = json.get("id").getAsString();
long newId = Long.parseLong(id) + i;
json.addProperty("id", newId);
runner.enqueue(message.getBytes());
}
runner.run();
runner.assertAllFlowFilesTransferred(PutElasticSearch.REL_SUCCESS, 100);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticSearch.REL_SUCCESS).get(0);
}
/**
* Convert an input stream to a stream
* @param is input the input stream
* @return return the converted input stream as a string
*/
static String convertStreamToString(java.io.InputStream is) {
java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
return s.hasNext() ? s.next() : "";
}
}

View File

@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockProcessorInitializationContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.AbstractListenableActionFuture;
import org.elasticsearch.action.support.AdapterActionFuture;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class TestPutElasticsearch {
private InputStream twitterExample;
private TestRunner runner;
@Before
public void setUp() throws IOException {
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
twitterExample = classloader
.getResourceAsStream("TweetExample.json");
}
@After
public void teardown() {
runner = null;
}
@Test
public void testPutElasticSearchOnTrigger() throws IOException {
runner = TestRunners.newTestRunner(new ElasticsearchTestProcessor(false)); // no failures
runner.setValidateExpressionUsage(false);
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch.INDEX, "tweet");
runner.assertNotValid();
runner.setProperty(PutElasticsearch.TYPE, "status");
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
runner.assertNotValid();
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
runner.assertValid();
runner.enqueue(twitterExample, new HashMap<String, String>() {{
put("tweet_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("tweet_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerWithFailures() throws IOException {
runner = TestRunners.newTestRunner(new ElasticsearchTestProcessor(true)); // simulate failures
runner.setValidateExpressionUsage(false);
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch.INDEX, "tweet");
runner.setProperty(PutElasticsearch.TYPE, "status");
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
runner.enqueue(twitterExample, new HashMap<String, String>() {{
put("tweet_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0);
assertNotNull(out);
out.assertAttributeEquals("tweet_id", "28039652140");
}
/**
* A Test class that extends the processor in order to inject/mock behavior
*/
private static class ElasticsearchTestProcessor extends PutElasticsearch {
boolean responseHasFailures = false;
public ElasticsearchTestProcessor(boolean responseHasFailures) {
this.responseHasFailures = responseHasFailures;
}
@Override
@OnScheduled
public void createClient(ProcessContext context) throws IOException {
esClient = mock(Client.class);
BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(esClient, BulkAction.INSTANCE));
doReturn(new MockBulkRequestBuilderExecutor(responseHasFailures)).when(bulkRequestBuilder).execute();
when(esClient.prepareBulk()).thenReturn(bulkRequestBuilder);
IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(esClient, IndexAction.INSTANCE);
when(esClient.prepareIndex(anyString(), anyString(), anyString())).thenReturn(indexRequestBuilder);
}
private static class MockBulkRequestBuilderExecutor
extends AdapterActionFuture<BulkResponse, ActionListener<BulkResponse>>
implements ListenableActionFuture<BulkResponse> {
boolean responseHasFailures = false;
public MockBulkRequestBuilderExecutor(boolean responseHasFailures) {
this.responseHasFailures = responseHasFailures;
}
@Override
protected BulkResponse convert(ActionListener<BulkResponse> bulkResponseActionListener) {
return null;
}
@Override
public void addListener(ActionListener<BulkResponse> actionListener) {
}
@Override
public BulkResponse get() throws InterruptedException, ExecutionException {
BulkResponse response = mock(BulkResponse.class);
when(response.hasFailures()).thenReturn(responseHasFailures);
return response;
}
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Integration test section below
//
// The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution.
// However if you wish to execute them as part of a test phase, comment out the @Ignored line for each
// desired test.
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
/**
* Tests basic ES functionality against a local or test ES cluster
* @throws IOException
*/
@Test
@Ignore("Comment this out if you want to run against local or test ES")
public void testPutElasticSearchBasic() throws IOException {
System.out.println("Starting test " + new Object() {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch());
runner.setValidateExpressionUsage(false);
//Local Cluster - Mac pulled from brew
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch.INDEX, "tweet");
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
runner.enqueue(twitterExample);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
}
@Test
@Ignore("Comment this out if you want to run against local or test ES")
public void testPutElasticSearchBatch() throws IOException {
System.out.println("Starting test " + new Object() {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch());
runner.setValidateExpressionUsage(false);
//Local Cluster - Mac pulled from brew
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(PutElasticsearch.INDEX, "tweet");
runner.setProperty(PutElasticsearch.BATCH_SIZE, "100");
JsonParser parser = new JsonParser();
JsonObject json;
String message = convertStreamToString(twitterExample);
for (int i = 0; i < 100; i++) {
json = parser.parse(message).getAsJsonObject();
String id = json.get("id").getAsString();
long newId = Long.parseLong(id) + i;
json.addProperty("id", newId);
runner.enqueue(message.getBytes());
}
runner.run();
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 100);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
}
/**
* Convert an input stream to a stream
*
* @param is input the input stream
* @return return the converted input stream as a string
*/
static String convertStreamToString(java.io.InputStream is) {
java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
return s.hasNext() ? s.next() : "";
}
}

View File

@ -7,12 +7,17 @@
<parent> <parent>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId> <artifactId>nifi-nar-bundles</artifactId>
<version>0.4.1-SNAPSHOT</version> <version>0.4.2-SNAPSHOT</version>
</parent> </parent>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-elasticsearch-bundle</artifactId> <artifactId>nifi-elasticsearch-bundle</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<properties>
<lucene.version>5.3.1</lucene.version>
</properties>
<modules> <modules>
<module>nifi-elasticsearch-nar</module> <module>nifi-elasticsearch-nar</module>
<module>nifi-elasticsearch-processors</module> <module>nifi-elasticsearch-processors</module>

View File

@ -1032,6 +1032,12 @@ language governing permissions and limitations under the License. -->
<version>0.4.2-SNAPSHOT</version> <version>0.4.2-SNAPSHOT</version>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-elasticsearch-nar</artifactId>
<version>0.4.2-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId> <artifactId>nifi-properties</artifactId>