mirror of https://github.com/apache/nifi.git
NIFI-1275: Added FetchElasticsearch processor and support for secure clusters
This closes #180. Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
parent
0c137bc22d
commit
75af3a2ebe
|
@ -790,6 +790,21 @@ The following binary components are provided under the Apache Software License v
|
|||
This product includes software developed by
|
||||
The Groovy community (http://groovy.codehaus.org/).
|
||||
|
||||
(ASLv2) Carrotsearch HPPC
|
||||
The following NOTICE information applies:
|
||||
HPPC borrowed code, ideas or both from:
|
||||
|
||||
* Apache Lucene, http://lucene.apache.org/
|
||||
(Apache license)
|
||||
* Fastutil, http://fastutil.di.unimi.it/
|
||||
(Apache license)
|
||||
* Koloboke, https://github.com/OpenHFT/Koloboke
|
||||
(Apache license)
|
||||
|
||||
(ASLv2) t-digest
|
||||
The following NOTICE information applies:
|
||||
The code for the t-digest was originally authored by Ted Dunning
|
||||
A number of small but very helpful changes have been contributed by Adrien Grand (https://github.com/jpountz)
|
||||
|
||||
************************
|
||||
Common Development and Distribution License 1.1
|
||||
|
@ -913,3 +928,8 @@ The following binary components are provided to the 'Public Domain'. See projec
|
|||
|
||||
(Public Domain) XZ for Java (org.tukaani:xz:jar:1.5 - http://tukaani.org/xz/java.html
|
||||
(Public Domain) AOP Alliance 1.0 (http://aopalliance.sourceforge.net/)
|
||||
|
||||
The following binary components are provided under the Creative Commons Zero license version 1.0. See project link for details.
|
||||
|
||||
(CC0v1.0) JSR166e for Twitter (com.twitter:jsr166e:jar:1.1.0 - https://github.com/twitter/jsr166e)
|
||||
|
||||
|
|
|
@ -1,4 +1,14 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
|
||||
license agreements. See the NOTICE file distributed with this work for additional
|
||||
information regarding copyright ownership. The ASF licenses this file to
|
||||
You under the Apache License, Version 2.0 (the "License"); you may not use
|
||||
this file except in compliance with the License. You may obtain a copy of
|
||||
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
|
||||
by applicable law or agreed to in writing, software distributed under the
|
||||
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
|
||||
OF ANY KIND, either express or implied. See the License for the specific
|
||||
language governing permissions and limitations under the License. -->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
@ -14,10 +24,14 @@
|
|||
<packaging>nar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-standard-services-api-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-elasticsearch-processors</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
|
|
@ -208,5 +208,32 @@ notices and license terms. Your use of the source code for the these
|
|||
subcomponents is subject to the terms and conditions of the following
|
||||
licenses.
|
||||
|
||||
The binary distribution of this product bundles 'Woodstox StAX 2 API' which is
|
||||
"licensed under standard BSD license"
|
||||
The binary distribution of this product bundles 'HdrHistogram' which is available under a 2-Clause BSD style license:
|
||||
|
||||
Copyright (c) 2012, 2013, 2014 Gil Tene
|
||||
Copyright (c) 2014 Michael Barker
|
||||
Copyright (c) 2014 Matt Warren
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
1. Redistributions of source code must retain the above copyright notice,
|
||||
this list of conditions and the following disclaimer.
|
||||
|
||||
2. Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
||||
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
|
||||
THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
|
|
|
@ -10,15 +10,243 @@ Apache Software License v2
|
|||
|
||||
The following binary components are provided under the Apache Software License v2
|
||||
|
||||
(ASLv2) Apache Commons Lang
|
||||
(ASLv2) Elasticsearch
|
||||
The following NOTICE information applies:
|
||||
Apache Commons Lang
|
||||
Copyright 2001-2014 The Apache Software Foundation
|
||||
|
||||
This product includes software from the Spring Framework,
|
||||
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
|
||||
Elasticsearch
|
||||
Copyright 2009-2015 Elasticsearch
|
||||
|
||||
(ASLv2) Apache Commons IO
|
||||
The following NOTICE information applies:
|
||||
Apache Commons IO
|
||||
Copyright 2002-2012 The Apache Software Foundation
|
||||
|
||||
(ASLv2) Apache Lucene
|
||||
The following NOTICE information applies:
|
||||
Apache Lucene
|
||||
Copyright 2014 The Apache Software Foundation
|
||||
|
||||
Includes software from other Apache Software Foundation projects,
|
||||
including, but not limited to:
|
||||
- Apache Ant
|
||||
- Apache Jakarta Regexp
|
||||
- Apache Commons
|
||||
- Apache Xerces
|
||||
|
||||
ICU4J, (under analysis/icu) is licensed under an MIT styles license
|
||||
and Copyright (c) 1995-2008 International Business Machines Corporation and others
|
||||
|
||||
Some data files (under analysis/icu/src/data) are derived from Unicode data such
|
||||
as the Unicode Character Database. See http://unicode.org/copyright.html for more
|
||||
details.
|
||||
|
||||
Brics Automaton (under core/src/java/org/apache/lucene/util/automaton) is
|
||||
BSD-licensed, created by Anders Møller. See http://www.brics.dk/automaton/
|
||||
|
||||
The levenshtein automata tables (under core/src/java/org/apache/lucene/util/automaton) were
|
||||
automatically generated with the moman/finenight FSA library, created by
|
||||
Jean-Philippe Barrette-LaPierre. This library is available under an MIT license,
|
||||
see http://sites.google.com/site/rrettesite/moman and
|
||||
http://bitbucket.org/jpbarrette/moman/overview/
|
||||
|
||||
The class org.apache.lucene.util.WeakIdentityMap was derived from
|
||||
the Apache CXF project and is Apache License 2.0.
|
||||
|
||||
The Google Code Prettify is Apache License 2.0.
|
||||
See http://code.google.com/p/google-code-prettify/
|
||||
|
||||
JUnit (junit-4.10) is licensed under the Common Public License v. 1.0
|
||||
See http://junit.sourceforge.net/cpl-v10.html
|
||||
|
||||
This product includes code (JaspellTernarySearchTrie) from Java Spelling Checkin
|
||||
g Package (jaspell): http://jaspell.sourceforge.net/
|
||||
License: The BSD License (http://www.opensource.org/licenses/bsd-license.php)
|
||||
|
||||
The snowball stemmers in
|
||||
analysis/common/src/java/net/sf/snowball
|
||||
were developed by Martin Porter and Richard Boulton.
|
||||
The snowball stopword lists in
|
||||
analysis/common/src/resources/org/apache/lucene/analysis/snowball
|
||||
were developed by Martin Porter and Richard Boulton.
|
||||
The full snowball package is available from
|
||||
http://snowball.tartarus.org/
|
||||
|
||||
The KStem stemmer in
|
||||
analysis/common/src/org/apache/lucene/analysis/en
|
||||
was developed by Bob Krovetz and Sergio Guzman-Lara (CIIR-UMass Amherst)
|
||||
under the BSD-license.
|
||||
|
||||
The Arabic,Persian,Romanian,Bulgarian, and Hindi analyzers (common) come with a default
|
||||
stopword list that is BSD-licensed created by Jacques Savoy. These files reside in:
|
||||
analysis/common/src/resources/org/apache/lucene/analysis/ar/stopwords.txt,
|
||||
analysis/common/src/resources/org/apache/lucene/analysis/fa/stopwords.txt,
|
||||
analysis/common/src/resources/org/apache/lucene/analysis/ro/stopwords.txt,
|
||||
analysis/common/src/resources/org/apache/lucene/analysis/bg/stopwords.txt,
|
||||
analysis/common/src/resources/org/apache/lucene/analysis/hi/stopwords.txt
|
||||
See http://members.unine.ch/jacques.savoy/clef/index.html.
|
||||
|
||||
The German,Spanish,Finnish,French,Hungarian,Italian,Portuguese,Russian and Swedish light stemmers
|
||||
(common) are based on BSD-licensed reference implementations created by Jacques Savoy and
|
||||
Ljiljana Dolamic. These files reside in:
|
||||
analysis/common/src/java/org/apache/lucene/analysis/de/GermanLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/de/GermanMinimalStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/es/SpanishLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/fi/FinnishLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/fr/FrenchLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/fr/FrenchMinimalStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/hu/HungarianLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/it/ItalianLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/pt/PortugueseLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/ru/RussianLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/sv/SwedishLightStemmer.java
|
||||
|
||||
The Stempel analyzer (stempel) includes BSD-licensed software developed
|
||||
by the Egothor project http://egothor.sf.net/, created by Leo Galambos, Martin Kvapil,
|
||||
and Edmond Nolan.
|
||||
|
||||
The Polish analyzer (stempel) comes with a default
|
||||
stopword list that is BSD-licensed created by the Carrot2 project. The file resides
|
||||
in stempel/src/resources/org/apache/lucene/analysis/pl/stopwords.txt.
|
||||
See http://project.carrot2.org/license.html.
|
||||
|
||||
The SmartChineseAnalyzer source code (smartcn) was
|
||||
provided by Xiaoping Gao and copyright 2009 by www.imdict.net.
|
||||
|
||||
WordBreakTestUnicode_*.java (under modules/analysis/common/src/test/)
|
||||
is derived from Unicode data such as the Unicode Character Database.
|
||||
See http://unicode.org/copyright.html for more details.
|
||||
|
||||
The Morfologik analyzer (morfologik) includes BSD-licensed software
|
||||
developed by Dawid Weiss and Marcin Miłkowski (http://morfologik.blogspot.com/).
|
||||
|
||||
Morfologik uses data from Polish ispell/myspell dictionary
|
||||
(http://www.sjp.pl/slownik/en/) licenced on the terms of (inter alia)
|
||||
LGPL and Creative Commons ShareAlike.
|
||||
|
||||
Morfologic includes data from BSD-licensed dictionary of Polish (SGJP)
|
||||
(http://sgjp.pl/morfeusz/)
|
||||
|
||||
Servlet-api.jar and javax.servlet-*.jar are under the CDDL license, the original
|
||||
source code for this can be found at http://www.eclipse.org/jetty/downloads.php
|
||||
|
||||
===========================================================================
|
||||
Kuromoji Japanese Morphological Analyzer - Apache Lucene Integration
|
||||
===========================================================================
|
||||
|
||||
This software includes a binary and/or source version of data from
|
||||
|
||||
mecab-ipadic-2.7.0-20070801
|
||||
|
||||
which can be obtained from
|
||||
|
||||
http://atilika.com/releases/mecab-ipadic/mecab-ipadic-2.7.0-20070801.tar.gz
|
||||
|
||||
or
|
||||
|
||||
http://jaist.dl.sourceforge.net/project/mecab/mecab-ipadic/2.7.0-20070801/mecab-ipadic-2.7.0-20070801.tar.gz
|
||||
|
||||
===========================================================================
|
||||
mecab-ipadic-2.7.0-20070801 Notice
|
||||
===========================================================================
|
||||
|
||||
Nara Institute of Science and Technology (NAIST),
|
||||
the copyright holders, disclaims all warranties with regard to this
|
||||
software, including all implied warranties of merchantability and
|
||||
fitness, in no event shall NAIST be liable for
|
||||
any special, indirect or consequential damages or any damages
|
||||
whatsoever resulting from loss of use, data or profits, whether in an
|
||||
action of contract, negligence or other tortuous action, arising out
|
||||
of or in connection with the use or performance of this software.
|
||||
|
||||
A large portion of the dictionary entries
|
||||
originate from ICOT Free Software. The following conditions for ICOT
|
||||
Free Software applies to the current dictionary as well.
|
||||
|
||||
Each User may also freely distribute the Program, whether in its
|
||||
original form or modified, to any third party or parties, PROVIDED
|
||||
that the provisions of Section 3 ("NO WARRANTY") will ALWAYS appear
|
||||
on, or be attached to, the Program, which is distributed substantially
|
||||
in the same form as set out herein and that such intended
|
||||
distribution, if actually made, will neither violate or otherwise
|
||||
contravene any of the laws and regulations of the countries having
|
||||
jurisdiction over the User or the intended distribution itself.
|
||||
|
||||
NO WARRANTY
|
||||
|
||||
The program was produced on an experimental basis in the course of the
|
||||
research and development conducted during the project and is provided
|
||||
to users as so produced on an experimental basis. Accordingly, the
|
||||
program is provided without any warranty whatsoever, whether express,
|
||||
implied, statutory or otherwise. The term "warranty" used herein
|
||||
includes, but is not limited to, any warranty of the quality,
|
||||
performance, merchantability and fitness for a particular purpose of
|
||||
the program and the nonexistence of any infringement or violation of
|
||||
any right of any third party.
|
||||
|
||||
Each user of the program will agree and understand, and be deemed to
|
||||
have agreed and understood, that there is no warranty whatsoever for
|
||||
the program and, accordingly, the entire risk arising from or
|
||||
otherwise connected with the program is assumed by the user.
|
||||
|
||||
Therefore, neither ICOT, the copyright holder, or any other
|
||||
organization that participated in or was otherwise related to the
|
||||
development of the program and their respective officials, directors,
|
||||
officers and other employees shall be held liable for any and all
|
||||
damages, including, without limitation, general, special, incidental
|
||||
and consequential damages, arising out of or otherwise in connection
|
||||
with the use or inability to use the program or any product, material
|
||||
or result produced or otherwise obtained by using the program,
|
||||
regardless of whether they have been advised of, or otherwise had
|
||||
knowledge of, the possibility of such damages at any time during the
|
||||
project or thereafter. Each user will be deemed to have agreed to the
|
||||
foregoing by his or her commencement of use of the program. The term
|
||||
"use" as used herein includes, but is not limited to, the use,
|
||||
modification, copying and distribution of the program and the
|
||||
production of secondary products from the program.
|
||||
|
||||
In the case where the program, whether in its original form or
|
||||
modified, was distributed or delivered to or received by a user from
|
||||
any person, organization or entity other than ICOT, unless it makes or
|
||||
grants independently of ICOT any specific warranty to the user in
|
||||
writing, such person, organization or entity, will also be exempted
|
||||
from and not be held liable to the user for any such damages as noted
|
||||
above as far as the program is concerned.
|
||||
|
||||
(ASLv2) Carrotsearch HPPC
|
||||
The following NOTICE information applies:
|
||||
HPPC borrowed code, ideas or both from:
|
||||
|
||||
* Apache Lucene, http://lucene.apache.org/
|
||||
(Apache license)
|
||||
* Fastutil, http://fastutil.di.unimi.it/
|
||||
(Apache license)
|
||||
* Koloboke, https://github.com/OpenHFT/Koloboke
|
||||
(Apache license)
|
||||
|
||||
(ASLv2) Joda Time
|
||||
The following NOTICE information applies:
|
||||
This product includes software developed by
|
||||
Joda.org (http://www.joda.org/).
|
||||
|
||||
(ASLv2) The Netty Project
|
||||
The following NOTICE information applies:
|
||||
The Netty Project
|
||||
Copyright 2011 The Netty Project
|
||||
|
||||
(ASLv2) t-digest
|
||||
The following NOTICE information applies:
|
||||
The code for the t-digest was originally authored by Ted Dunning
|
||||
A number of small but very helpful changes have been contributed by Adrien Grand (https://github.com/jpountz)
|
||||
|
||||
(ASLv2) Apache Commons-CLI
|
||||
The following NOTICE information applies:
|
||||
Apache Commons CLI
|
||||
Copyright 2001-2016 The Apache Software Foundation
|
||||
|
||||
*****************
|
||||
Public Domain
|
||||
*****************
|
||||
|
||||
The following binary components are provided under the Creative Commons Zero license version 1.0. See project link for details.
|
||||
|
||||
(CC0v1.0) JSR166e for Twitter (com.twitter:jsr166e:jar:1.1.0 - https://github.com/twitter/jsr166e)
|
||||
|
||||
|
|
|
@ -1,4 +1,14 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
|
||||
license agreements. See the NOTICE file distributed with this work for additional
|
||||
information regarding copyright ownership. The ASF licenses this file to
|
||||
You under the Apache License, Version 2.0 (the "License"); you may not use
|
||||
this file except in compliance with the License. You may obtain a copy of
|
||||
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
|
||||
by applicable law or agreed to in writing, software distributed under the
|
||||
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
|
||||
OF ANY KIND, either express or implied. See the License for the specific
|
||||
language governing permissions and limitations under the License. -->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
@ -9,20 +19,24 @@
|
|||
<version>0.4.2-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-elasticsearch-processors</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<slf4jversion>1.7.12</slf4jversion>
|
||||
<es.version>2.1.0</es.version>
|
||||
<gsonversion>2.4</gsonversion>
|
||||
<jodatimeversion>2.9.1</jodatimeversion>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-properties</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
@ -38,30 +52,23 @@
|
|||
<artifactId>nifi-mock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>${slf4jversion}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-simple</artifactId>
|
||||
<version>${slf4jversion}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.elasticsearch</groupId>
|
||||
<artifactId>elasticsearch</artifactId>
|
||||
<version>${es.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.code.gson</groupId>
|
||||
<artifactId>gson</artifactId>
|
||||
<version>${gsonversion}</version>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ssl-context-service-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>joda-time</groupId>
|
||||
<artifactId>joda-time</artifactId>
|
||||
<version>${jodatimeversion}</version>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ssl-context-service</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
|
|
@ -16,84 +16,115 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.elasticsearch;
|
||||
|
||||
import com.google.gson.JsonObject;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.logging.ProcessorLog;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.File;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.elasticsearch.node.NodeBuilder;
|
||||
|
||||
public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
|
||||
|
||||
protected static final AllowableValue TRANSPORT_CLIENT =
|
||||
new AllowableValue("transport", "Transport",
|
||||
"Specifies a Transport Client be used to connect to the Elasticsearch cluster. A Transport "
|
||||
+ "client does not join the cluster, and is better for a large number of connections "
|
||||
+ "and/or if the NiFi node(s) and Elasticsearch nodes are mostly isolated via firewall.");
|
||||
|
||||
protected static final AllowableValue NODE_CLIENT =
|
||||
new AllowableValue("node", "Node",
|
||||
"Specifies a Node Client be used to connect to the Elasticsearch cluster. This client joins the "
|
||||
+ "cluster, so operations are performed more quickly, but the NiFi node may need to be "
|
||||
+ "configured such that it can successfully join the Elasticsearch cluster");
|
||||
|
||||
protected static final PropertyDescriptor CLIENT_TYPE = new PropertyDescriptor.Builder()
|
||||
.name("Client type")
|
||||
.description("The type of client used to connect to the Elasticsearch cluster. Transport client is more "
|
||||
+ "isolated and lighter-weight, and Node client is faster and more integrated into the ES cluster")
|
||||
.required(true)
|
||||
.allowableValues(TRANSPORT_CLIENT, NODE_CLIENT)
|
||||
.defaultValue(TRANSPORT_CLIENT.getValue())
|
||||
.addValidator(Validator.VALID)
|
||||
.build();
|
||||
/**
|
||||
* This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries
|
||||
*/
|
||||
private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() {
|
||||
@Override
|
||||
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
|
||||
final List<String> esList = Arrays.asList(input.split(","));
|
||||
for (String hostnamePort : esList) {
|
||||
String[] addresses = hostnamePort.split(":");
|
||||
// Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there)
|
||||
if (addresses.length != 2) {
|
||||
return new ValidationResult.Builder().subject(subject).input(input).explanation(
|
||||
"Must be in hostname:port form (no scheme such as http://").valid(false).build();
|
||||
}
|
||||
}
|
||||
return new ValidationResult.Builder().subject(subject).input(input).explanation(
|
||||
"Valid cluster definition").valid(true).build();
|
||||
}
|
||||
};
|
||||
|
||||
protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder()
|
||||
.name("Cluster Name")
|
||||
.description("Name of the ES cluster (for example, elasticsearch_brew). Defaults to 'elasticsearch'")
|
||||
.required(false)
|
||||
.addValidator(Validator.VALID)
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.defaultValue("elasticsearch")
|
||||
.build();
|
||||
|
||||
protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder()
|
||||
.name("ElasticSearch Hosts")
|
||||
.description("ElasticSearch Hosts, which should be comma separated and colon for hostname/port "
|
||||
+ "host1:port,host2:port,.... For example testcluster:9300. Note that this property is only "
|
||||
+ "needed when using a Transport client, it is ignored when using a Node client")
|
||||
.required(false)
|
||||
.addValidator(new ElasticsearchClientValidator())
|
||||
+ "host1:port,host2:port,.... For example testcluster:9300.")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(false)
|
||||
.addValidator(HOSTNAME_PORT_VALIDATOR)
|
||||
.build();
|
||||
|
||||
protected static final PropertyDescriptor PATH_HOME = new PropertyDescriptor.Builder()
|
||||
.name("ElasticSearch Path Home")
|
||||
.description("ElasticSearch node client requires that path.home be set. For example, "
|
||||
+ "/usr/share/elasticsearch or /usr/local/opt/elasticsearch for homebrew intall "
|
||||
+ "https://www.elastic.co/guide/en/elasticsearch/reference/current/setup-dir-layout.html")
|
||||
public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("SSL Context Service")
|
||||
.description("The SSL Context Service used to provide client certificate information for TLS/SSL "
|
||||
+ "connections. This service only applies if the Shield plugin is available.")
|
||||
.required(false)
|
||||
.addValidator(new ElasticsearchClientValidator())
|
||||
.identifiesControllerService(SSLContextService.class)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_SHIELD_LOCATION = new PropertyDescriptor.Builder()
|
||||
.name("Shield Plugin Filename")
|
||||
.description("Specifies the path to the JAR for the Elasticsearch Shield plugin. "
|
||||
+ "If the Elasticsearch cluster has been secured with the Shield plugin, then the Shield plugin "
|
||||
+ "JAR must also be available to this processor. Note: Do NOT place the Shield JAR into NiFi's "
|
||||
+ "lib/ directory, doing so will prevent the Shield plugin from being loaded.")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
|
||||
.name("Username")
|
||||
.description("Username to access the Elasticsearch cluster")
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("Password")
|
||||
.description("Password to access the Elasticsearch cluster")
|
||||
.required(false)
|
||||
.sensitive(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name("ElasticSearch Ping Timeout")
|
||||
.description("The ping timeout used to determine when a node is unreachable. " +
|
||||
.description("The ping timeout used to determine when a node is unreachable. " +
|
||||
"For example, 5s (5 seconds). If non-local recommended is 30s")
|
||||
.required(true)
|
||||
.defaultValue("5s")
|
||||
|
@ -102,83 +133,175 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
|
|||
|
||||
protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder()
|
||||
.name("Sampler Interval")
|
||||
.description("Node sampler interval. For example, 5s (5 seconds) If non-local recommended is 30s")
|
||||
.description("How often to sample / ping the nodes listed and connected. For example, 5s (5 seconds). "
|
||||
+ "If non-local recommended is 30s.")
|
||||
.required(true)
|
||||
.defaultValue("5s")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
protected static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
|
||||
.name("Character Set")
|
||||
.description("Specifies the character set of the document data.")
|
||||
.required(true)
|
||||
.defaultValue("UTF-8")
|
||||
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||
.build();
|
||||
|
||||
protected Client esClient;
|
||||
protected AtomicReference<Client> esClient = new AtomicReference<>();
|
||||
protected List<InetSocketAddress> esHosts;
|
||||
protected String authToken;
|
||||
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
Set<ValidationResult> results = new HashSet<>();
|
||||
|
||||
// Ensure that if username or password is set, then the other is too
|
||||
Map<PropertyDescriptor, String> propertyMap = validationContext.getProperties();
|
||||
if (StringUtils.isEmpty(propertyMap.get(USERNAME)) != StringUtils.isEmpty(propertyMap.get(PASSWORD))) {
|
||||
results.add(new ValidationResult.Builder().valid(false).explanation(
|
||||
"If username or password is specified, then the other must be specified as well").build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
public void setup(ProcessContext context) {
|
||||
// Create the client if one does not already exist
|
||||
createElasticsearchClient(context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiate ElasticSearch Client
|
||||
* Instantiate ElasticSearch Client. This chould be called by subclasses' @OnScheduled method to create a client
|
||||
* if one does not yet exist. If called when scheduled, closeClient() should be called by the subclasses' @OnStopped
|
||||
* method so the client will be destroyed when the processor is stopped.
|
||||
*
|
||||
* @param context
|
||||
* @throws IOException
|
||||
* @param context The context for this processor
|
||||
* @throws ProcessException if an error occurs while creating an Elasticsearch client
|
||||
*/
|
||||
@OnScheduled
|
||||
public void createClient(ProcessContext context) throws IOException {
|
||||
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
|
||||
|
||||
ProcessorLog log = getLogger();
|
||||
if (esClient != null) {
|
||||
closeClient();
|
||||
if (esClient.get() != null) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Creating ElasticSearch Client");
|
||||
|
||||
log.debug("Creating ElasticSearch Client");
|
||||
try {
|
||||
final String clusterType = context.getProperty(CLIENT_TYPE).toString();
|
||||
final String clusterName = context.getProperty(CLUSTER_NAME).toString();
|
||||
final String pingTimeout = context.getProperty(PING_TIMEOUT).toString();
|
||||
final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).toString();
|
||||
final String clusterName = context.getProperty(CLUSTER_NAME).getValue();
|
||||
final String pingTimeout = context.getProperty(PING_TIMEOUT).getValue();
|
||||
final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).getValue();
|
||||
final String username = context.getProperty(USERNAME).getValue();
|
||||
final String password = context.getProperty(PASSWORD).getValue();
|
||||
|
||||
if ("transport".equals(clusterType)) {
|
||||
final SSLContextService sslService =
|
||||
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
|
||||
//create new transport client
|
||||
Settings settings = Settings.settingsBuilder()
|
||||
.put("cluster.name", clusterName)
|
||||
.put("client.transport.ping_timeout", pingTimeout)
|
||||
.put("client.transport.nodes_sampler_interval", samplerInterval)
|
||||
.build();
|
||||
Settings.Builder settingsBuilder = Settings.settingsBuilder()
|
||||
.put("cluster.name", clusterName)
|
||||
.put("client.transport.ping_timeout", pingTimeout)
|
||||
.put("client.transport.nodes_sampler_interval", samplerInterval);
|
||||
|
||||
TransportClient transportClient = TransportClient.builder().settings(settings).build();
|
||||
String shieldUrl = context.getProperty(PROP_SHIELD_LOCATION).getValue();
|
||||
if (sslService != null) {
|
||||
settingsBuilder.put("shield.transport.ssl", "true")
|
||||
.put("shield.ssl.keystore.path", sslService.getKeyStoreFile())
|
||||
.put("shield.ssl.keystore.password", sslService.getKeyStorePassword())
|
||||
.put("shield.ssl.truststore.path", sslService.getTrustStoreFile())
|
||||
.put("shield.ssl.truststore.password", sslService.getTrustStorePassword());
|
||||
}
|
||||
|
||||
final String hosts = context.getProperty(HOSTS).toString();
|
||||
esHosts = GetEsHosts(hosts);
|
||||
// Set username and password for Shield
|
||||
if (!StringUtils.isEmpty(username)) {
|
||||
StringBuffer shieldUser = new StringBuffer(username);
|
||||
if (!StringUtils.isEmpty(password)) {
|
||||
shieldUser.append(":");
|
||||
shieldUser.append(password);
|
||||
}
|
||||
settingsBuilder.put("shield.user", shieldUser);
|
||||
|
||||
if (esHosts != null) {
|
||||
for (final InetSocketAddress host : esHosts) {
|
||||
}
|
||||
|
||||
TransportClient transportClient = getTransportClient(settingsBuilder, shieldUrl, username, password);
|
||||
|
||||
final String hosts = context.getProperty(HOSTS).getValue();
|
||||
esHosts = getEsHosts(hosts);
|
||||
|
||||
if (esHosts != null) {
|
||||
for (final InetSocketAddress host : esHosts) {
|
||||
try {
|
||||
transportClient.addTransportAddress(new InetSocketTransportAddress(host));
|
||||
} catch (IllegalArgumentException iae) {
|
||||
log.error("Could not add transport address {}", new Object[]{host});
|
||||
}
|
||||
}
|
||||
esClient = transportClient;
|
||||
} else if ("node".equals(clusterType)) {
|
||||
|
||||
final String pathHome = context.getProperty(PATH_HOME).toString();
|
||||
//create new node client
|
||||
Settings settings = Settings.settingsBuilder()
|
||||
.put("path.home", pathHome)
|
||||
.build();
|
||||
|
||||
esClient = NodeBuilder.nodeBuilder().clusterName(clusterName).settings(settings).node().client();
|
||||
}
|
||||
esClient.set(transportClient);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to create Elasticsearch client due to {}", new Object[]{e}, e);
|
||||
throw e;
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected TransportClient getTransportClient(Settings.Builder settingsBuilder, String shieldUrl,
|
||||
String username, String password)
|
||||
throws MalformedURLException {
|
||||
|
||||
// Create new transport client using the Builder pattern
|
||||
TransportClient.Builder builder = TransportClient.builder();
|
||||
|
||||
// See if the Elasticsearch Shield JAR location was specified, and add the plugin if so. Also create the
|
||||
// authorization token if username and password are supplied.
|
||||
final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
if (!StringUtils.isBlank(shieldUrl)) {
|
||||
ClassLoader shieldClassLoader =
|
||||
new URLClassLoader(new URL[]{new File(shieldUrl).toURI().toURL()}, this.getClass().getClassLoader());
|
||||
Thread.currentThread().setContextClassLoader(shieldClassLoader);
|
||||
|
||||
try {
|
||||
Class shieldPluginClass = Class.forName("org.elasticsearch.shield.ShieldPlugin", true, shieldClassLoader);
|
||||
builder = builder.addPlugin(shieldPluginClass);
|
||||
|
||||
if (!StringUtils.isEmpty(username) && !StringUtils.isEmpty(password)) {
|
||||
|
||||
// Need a couple of classes from the Shield plugin to build the token
|
||||
Class usernamePasswordTokenClass =
|
||||
Class.forName("org.elasticsearch.shield.authc.support.UsernamePasswordToken", true, shieldClassLoader);
|
||||
|
||||
Class securedStringClass =
|
||||
Class.forName("org.elasticsearch.shield.authc.support.SecuredString", true, shieldClassLoader);
|
||||
|
||||
Constructor<?> securedStringCtor = securedStringClass.getConstructor(char[].class);
|
||||
Object securePasswordString = securedStringCtor.newInstance(password.toCharArray());
|
||||
|
||||
Method basicAuthHeaderValue = usernamePasswordTokenClass.getMethod("basicAuthHeaderValue", String.class, securedStringClass);
|
||||
authToken = (String) basicAuthHeaderValue.invoke(null, username, securePasswordString);
|
||||
}
|
||||
} catch (ClassNotFoundException
|
||||
| NoSuchMethodException
|
||||
| InstantiationException
|
||||
| IllegalAccessException
|
||||
| InvocationTargetException shieldLoadException) {
|
||||
getLogger().debug("Did not detect Elasticsearch Shield plugin, secure connections and/or authorization will not be available");
|
||||
}
|
||||
} else {
|
||||
getLogger().debug("No Shield plugin location specified, secure connections and/or authorization will not be available");
|
||||
}
|
||||
TransportClient transportClient = builder.settings(settingsBuilder.build()).build();
|
||||
Thread.currentThread().setContextClassLoader(originalClassLoader);
|
||||
return transportClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispose of ElasticSearch client
|
||||
*/
|
||||
@OnStopped
|
||||
public final void closeClient() {
|
||||
if (esClient != null) {
|
||||
public void closeClient() {
|
||||
if (esClient.get() != null) {
|
||||
getLogger().info("Closing ElasticSearch Client");
|
||||
esClient.close();
|
||||
esClient = null;
|
||||
esClient.get().close();
|
||||
esClient.set(null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -188,7 +311,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
|
|||
* @param hosts A comma-separated list of ElasticSearch hosts (host:port,host2:port2, etc.)
|
||||
* @return List of InetSocketAddresses for the ES hosts
|
||||
*/
|
||||
private List<InetSocketAddress> GetEsHosts(String hosts) {
|
||||
private List<InetSocketAddress> getEsHosts(String hosts) {
|
||||
|
||||
if (hosts == null) {
|
||||
return null;
|
||||
|
@ -199,57 +322,13 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
|
|||
for (String item : esList) {
|
||||
|
||||
String[] addresses = item.split(":");
|
||||
final String hostName = addresses[0];
|
||||
final int port = Integer.parseInt(addresses[1]);
|
||||
final String hostName = addresses[0].trim();
|
||||
final int port = Integer.parseInt(addresses[1].trim());
|
||||
|
||||
esHosts.add(new InetSocketAddress(hostName, port));
|
||||
}
|
||||
|
||||
return esHosts;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Source for ElasticSearch. The string representation of the JSON object is returned as a byte array after
|
||||
* replacing newlines with spaces
|
||||
*
|
||||
* @param input a JSON object to be serialized to UTF-8
|
||||
* @return a byte array containing the UTF-8 representation (without newlines) of the JSON object
|
||||
*/
|
||||
public byte[] getSource(final JsonObject input) {
|
||||
String jsonString = input.toString();
|
||||
jsonString = jsonString.replace("\r\n", " ").replace('\n', ' ').replace('\r', ' ');
|
||||
return jsonString.getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
/**
|
||||
* A custom validator for the ElasticSearch properties list. For example, the hostnames property doesn't need to
|
||||
* be filled in for a Node client, as it joins the cluster by name. Alternatively if a Transport client
|
||||
*/
|
||||
protected static class ElasticsearchClientValidator implements Validator {
|
||||
|
||||
@Override
|
||||
public ValidationResult validate(String subject, String input, ValidationContext context) {
|
||||
// Only validate hosts if cluster type == Transport
|
||||
if (HOSTS.getName().equals(subject)) {
|
||||
PropertyValue clientTypeProperty = context.getProperty(CLIENT_TYPE);
|
||||
if (TRANSPORT_CLIENT.getValue().equals(clientTypeProperty.getValue())) {
|
||||
return StandardValidators.NON_EMPTY_VALIDATOR.validate(
|
||||
CLIENT_TYPE.getName(), clientTypeProperty.getValue(), context);
|
||||
}
|
||||
}
|
||||
|
||||
// Only validate Path home if client type == Node
|
||||
if (PATH_HOME.getName().equals(subject)) {
|
||||
PropertyValue clientTypeProperty = context.getProperty(CLIENT_TYPE);
|
||||
if (NODE_CLIENT.getValue().equals(clientTypeProperty.getValue())) {
|
||||
return StandardValidators.NON_EMPTY_VALIDATOR.validate(
|
||||
CLIENT_TYPE.getName(), clientTypeProperty.getValue(), context);
|
||||
}
|
||||
}
|
||||
|
||||
return VALID.validate(subject, input, context);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,210 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.elasticsearch;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ProcessorLog;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.get.GetRequestBuilder;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
@EventDriven
|
||||
@SupportsBatching
|
||||
@Tags({"elasticsearch", "fetch", "read", "get"})
|
||||
@CapabilityDescription("Retrieves a document from Elasticsearch using the specified connection properties and the "
|
||||
+ "identifier of the document to retrieve. If the cluster has been configured for authorization and/or secure "
|
||||
+ "transport (SSL/TLS) and the Shield plugin is available, secure connections can be made. This processor "
|
||||
+ "supports Elasticsearch 2.x clusters.")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = "filename", description = "The filename attributes is set to the document identifier"),
|
||||
@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
|
||||
@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type")
|
||||
})
|
||||
public class FetchElasticsearch extends AbstractElasticsearchProcessor {
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
||||
.description("All FlowFiles that are read from Elasticsearch are routed to this relationship").build();
|
||||
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
||||
.description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build();
|
||||
|
||||
public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
|
||||
.description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may succeed")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
|
||||
.description("A FlowFile is routed to this relationship if the specified document does not exist in the Elasticsearch cluster")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder()
|
||||
.name("Document Identifier")
|
||||
.description("The identifier for the document to be fetched")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
|
||||
.name("Index")
|
||||
.description("The name of the index to read from")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
|
||||
.name("Type")
|
||||
.description("The type of this document (used by Elasticsearch for indexing and searching)")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
relationships.add(REL_SUCCESS);
|
||||
relationships.add(REL_FAILURE);
|
||||
relationships.add(REL_RETRY);
|
||||
relationships.add(REL_NOT_FOUND);
|
||||
return Collections.unmodifiableSet(relationships);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(CLUSTER_NAME);
|
||||
descriptors.add(HOSTS);
|
||||
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
|
||||
descriptors.add(PROP_SHIELD_LOCATION);
|
||||
descriptors.add(USERNAME);
|
||||
descriptors.add(PASSWORD);
|
||||
descriptors.add(PING_TIMEOUT);
|
||||
descriptors.add(SAMPLER_INTERVAL);
|
||||
descriptors.add(DOC_ID);
|
||||
descriptors.add(INDEX);
|
||||
descriptors.add(TYPE);
|
||||
descriptors.add(CHARSET);
|
||||
|
||||
return Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
|
||||
@OnScheduled
|
||||
public void setup(ProcessContext context) {
|
||||
super.setup(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
|
||||
|
||||
final ProcessorLog logger = getLogger();
|
||||
try {
|
||||
|
||||
logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
|
||||
GetRequestBuilder getRequestBuilder = esClient.get().prepareGet(index, docType, docId);
|
||||
if (authToken != null) {
|
||||
getRequestBuilder.putHeader("Authorization", authToken);
|
||||
}
|
||||
final GetResponse getResponse = getRequestBuilder.execute().actionGet();
|
||||
|
||||
if (getResponse == null || !getResponse.isExists()) {
|
||||
logger.warn("Failed to read {}/{}/{} from Elasticsearch: Document not found",
|
||||
new Object[]{index, docType, docId});
|
||||
|
||||
// We couldn't find the document, so penalize it and send it to "not found"
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_NOT_FOUND);
|
||||
} else {
|
||||
flowFile = session.putAttribute(flowFile, "filename", docId);
|
||||
flowFile = session.putAttribute(flowFile, "es.index", index);
|
||||
flowFile = session.putAttribute(flowFile, "es.type", docType);
|
||||
flowFile = session.write(flowFile, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(OutputStream out) throws IOException {
|
||||
out.write(getResponse.getSourceAsString().getBytes(charset));
|
||||
}
|
||||
});
|
||||
logger.debug("Elasticsearch document " + docId + " fetched, routing to success");
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
}
|
||||
} catch (NoNodeAvailableException
|
||||
| ElasticsearchTimeoutException
|
||||
| ReceiveTimeoutTransportException
|
||||
| NodeClosedException exceptionToRetry) {
|
||||
logger.error("Failed to read into Elasticsearch due to {}, this may indicate an error in configuration "
|
||||
+ "(hosts, username/password, etc.). Routing to retry",
|
||||
new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
|
||||
session.transfer(flowFile, REL_RETRY);
|
||||
context.yield();
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, e);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
context.yield();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispose of ElasticSearch client
|
||||
*/
|
||||
@OnStopped
|
||||
public void closeClient() {
|
||||
super.closeClient();
|
||||
}
|
||||
}
|
|
@ -16,11 +16,14 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.elasticsearch;
|
||||
|
||||
import com.google.gson.JsonObject;
|
||||
import com.google.gson.JsonParser;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.expression.AttributeExpression;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
|
@ -31,18 +34,18 @@ import org.apache.nifi.processor.Relationship;
|
|||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
|
@ -50,9 +53,14 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
|
||||
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
@EventDriven
|
||||
@SupportsBatching
|
||||
@Tags({"elasticsearch", "insert", "update", "write", "put"})
|
||||
@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch")
|
||||
@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as "
|
||||
+ "the index to insert into and the type of the document. If the cluster has been configured for authorization "
|
||||
+ "and/or secure transport (SSL/TLS) and the Shield plugin is available, secure connections can be made. This processor "
|
||||
+ "supports Elasticsearch 2.x clusters.")
|
||||
public class PutElasticsearch extends AbstractElasticsearchProcessor {
|
||||
|
||||
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
||||
|
@ -61,17 +69,16 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
|
|||
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
||||
.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build();
|
||||
|
||||
static final Relationship REL_RETRY = new Relationship.Builder()
|
||||
.name("retry")
|
||||
static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
|
||||
.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder()
|
||||
.name("Identifier attribute")
|
||||
.name("Identifier Attribute")
|
||||
.description("The name of the attribute containing the identifier for each FlowFile")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
|
||||
|
@ -113,27 +120,35 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
|
|||
@Override
|
||||
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(CLIENT_TYPE);
|
||||
descriptors.add(CLUSTER_NAME);
|
||||
descriptors.add(HOSTS);
|
||||
descriptors.add(PATH_HOME);
|
||||
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
|
||||
descriptors.add(PROP_SHIELD_LOCATION);
|
||||
descriptors.add(USERNAME);
|
||||
descriptors.add(PASSWORD);
|
||||
descriptors.add(PING_TIMEOUT);
|
||||
descriptors.add(SAMPLER_INTERVAL);
|
||||
descriptors.add(ID_ATTRIBUTE);
|
||||
descriptors.add(INDEX);
|
||||
descriptors.add(TYPE);
|
||||
descriptors.add(CHARSET);
|
||||
descriptors.add(BATCH_SIZE);
|
||||
|
||||
return Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void setup(ProcessContext context) {
|
||||
super.setup(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
|
||||
final String index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
|
||||
final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue();
|
||||
final String docType = context.getProperty(TYPE).getValue();
|
||||
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
|
||||
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
|
||||
|
||||
final List<FlowFile> flowFiles = session.get(batchSize);
|
||||
if (flowFiles.isEmpty()) {
|
||||
|
@ -143,22 +158,23 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
|
|||
final ProcessorLog logger = getLogger();
|
||||
|
||||
try {
|
||||
final BulkRequestBuilder bulk = esClient.prepareBulk();
|
||||
final BulkRequestBuilder bulk = esClient.get().prepareBulk();
|
||||
if (authToken != null) {
|
||||
bulk.putHeader("Authorization", authToken);
|
||||
}
|
||||
for (FlowFile file : flowFiles) {
|
||||
final String id = file.getAttribute(id_attribute);
|
||||
if (id == null) {
|
||||
getLogger().error("no value in identifier attribute {}", new Object[]{id_attribute});
|
||||
throw new ProcessException("No value in identifier attribute " + id_attribute);
|
||||
logger.error("No value in identifier attribute {} for {}", new Object[]{id_attribute, file});
|
||||
session.transfer(file, REL_FAILURE);
|
||||
}
|
||||
session.read(file, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(final InputStream in) throws IOException {
|
||||
|
||||
final InputStreamReader input = new InputStreamReader(in);
|
||||
final JsonParser parser = new JsonParser();
|
||||
final JsonObject json = parser.parse(input).getAsJsonObject();
|
||||
bulk.add(esClient.prepareIndex(index, docType, id)
|
||||
.setSource(getSource(json)));
|
||||
String json = IOUtils.toString(in, charset)
|
||||
.replace("\r\n", " ").replace('\n', ' ').replace('\r', ' ');
|
||||
bulk.add(esClient.get().prepareIndex(index, docType, id)
|
||||
.setSource(json.getBytes(charset)));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -169,58 +185,47 @@ public class PutElasticsearch extends AbstractElasticsearchProcessor {
|
|||
final FlowFile flowFile = flowFiles.get(item.getItemId());
|
||||
if (item.isFailed()) {
|
||||
logger.error("Failed to insert {} into Elasticsearch due to {}",
|
||||
new Object[]{flowFile, item.getFailure()}, new Exception());
|
||||
new Object[]{flowFile, item.getFailure().getMessage()});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
|
||||
} else {
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (final FlowFile flowFile : flowFiles) {
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
}
|
||||
session.transfer(flowFiles, REL_SUCCESS);
|
||||
}
|
||||
|
||||
|
||||
} catch (NoNodeAvailableException nne) {
|
||||
logger.error("Failed to insert {} into Elasticsearch No Node Available {}", new Object[]{nne}, nne);
|
||||
for (final FlowFile flowFile : flowFiles) {
|
||||
session.transfer(flowFile, REL_RETRY);
|
||||
}
|
||||
} catch (NoNodeAvailableException
|
||||
| ElasticsearchTimeoutException
|
||||
| ReceiveTimeoutTransportException
|
||||
| NodeClosedException exceptionToRetry) {
|
||||
|
||||
// Authorization errors and other problems are often returned as NoNodeAvailableExceptions without a
|
||||
// traceable cause. However the cause seems to be logged, just not available to this caught exception.
|
||||
// Since the error message will show up as a bulletin, we make specific mention to check the logs for
|
||||
// more details.
|
||||
logger.error("Failed to insert into Elasticsearch due to {}. More detailed information may be available in " +
|
||||
"the NiFi logs.",
|
||||
new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
|
||||
session.transfer(flowFiles, REL_RETRY);
|
||||
context.yield();
|
||||
|
||||
} catch (ElasticsearchTimeoutException ete) {
|
||||
logger.error("Failed to insert {} into Elasticsearch Timeout to {}", new Object[]{ete}, ete);
|
||||
for (final FlowFile flowFile : flowFiles) {
|
||||
session.transfer(flowFile, REL_RETRY);
|
||||
}
|
||||
} catch (Exception exceptionToFail) {
|
||||
logger.error("Failed to insert into Elasticsearch due to {}",
|
||||
new Object[]{exceptionToFail.getLocalizedMessage()}, exceptionToFail);
|
||||
|
||||
session.transfer(flowFiles, REL_FAILURE);
|
||||
context.yield();
|
||||
|
||||
} catch (ReceiveTimeoutTransportException rtt) {
|
||||
logger.error("Failed to insert {} into Elasticsearch ReceiveTimeoutTransportException to {}", new Object[]{rtt}, rtt);
|
||||
for (final FlowFile flowFile : flowFiles) {
|
||||
session.transfer(flowFile, REL_RETRY);
|
||||
}
|
||||
context.yield();
|
||||
|
||||
} catch (ElasticsearchParseException esp) {
|
||||
logger.error("Failed to insert {} into Elasticsearch Parse Exception {}", new Object[]{esp}, esp);
|
||||
|
||||
for (final FlowFile flowFile : flowFiles) {
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
}
|
||||
context.yield();
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to insert {} into Elasticsearch due to {}", new Object[]{e}, e);
|
||||
|
||||
for (final FlowFile flowFile : flowFiles) {
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
}
|
||||
context.yield();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispose of ElasticSearch client
|
||||
*/
|
||||
@OnStopped
|
||||
public void closeClient() {
|
||||
super.closeClient();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,4 +12,5 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.processors.elasticsearch.FetchElasticsearch
|
||||
org.apache.nifi.processors.elasticsearch.PutElasticsearch
|
||||
|
|
|
@ -0,0 +1,418 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.elasticsearch;
|
||||
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.MockProcessContext;
|
||||
import org.apache.nifi.util.MockProcessorInitializationContext;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ListenableActionFuture;
|
||||
import org.elasticsearch.action.get.GetAction;
|
||||
import org.elasticsearch.action.get.GetRequestBuilder;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.support.AdapterActionFuture;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.MalformedURLException;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestFetchElasticsearch {
|
||||
|
||||
private InputStream docExample;
|
||||
private TestRunner runner;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
|
||||
docExample = classloader.getResourceAsStream("DocumentExample.json");
|
||||
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() {
|
||||
runner = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchElasticsearchOnTrigger() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(true)); // all docs are found
|
||||
runner.setValidateExpressionUsage(true);
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
|
||||
|
||||
runner.setProperty(FetchElasticsearch.INDEX, "doc");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(FetchElasticsearch.TYPE, "status");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch.REL_SUCCESS).get(0);
|
||||
assertNotNull(out);
|
||||
out.assertAttributeEquals("doc_id", "28039652140");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchElasticsearchOnTriggerWithFailures() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(false)); // simulate doc not found
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(FetchElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(FetchElasticsearch.TYPE, "status");
|
||||
runner.setValidateExpressionUsage(true);
|
||||
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
// This test generates a "document not found"
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_NOT_FOUND, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch.REL_NOT_FOUND).get(0);
|
||||
assertNotNull(out);
|
||||
out.assertAttributeEquals("doc_id", "28039652140");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchElasticsearchWithBadHosts() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(false)); // simulate doc not found
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "http://127.0.0.1:9300,127.0.0.2:9300");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(FetchElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(FetchElasticsearch.TYPE, "status");
|
||||
runner.setValidateExpressionUsage(true);
|
||||
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
|
||||
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchElasticsearchOnTriggerWithExceptions() throws IOException {
|
||||
FetchElasticsearchTestProcessor processor = new FetchElasticsearchTestProcessor(true);
|
||||
runner = TestRunners.newTestRunner(processor);
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(FetchElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(FetchElasticsearch.TYPE, "status");
|
||||
runner.setValidateExpressionUsage(true);
|
||||
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
|
||||
|
||||
// No Node Available exception
|
||||
processor.setExceptionToThrow(new NoNodeAvailableException("test"));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Elasticsearch Timeout exception
|
||||
processor.setExceptionToThrow(new ElasticsearchTimeoutException("test"));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652141");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Receive Timeout Transport exception
|
||||
processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class)));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652141");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Node Closed exception
|
||||
processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class)));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652141");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Elasticsearch Parse exception
|
||||
processor.setExceptionToThrow(new ElasticsearchParseException("test"));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652141");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
// This test generates an exception on execute(),routes to failure
|
||||
runner.assertTransferCount(FetchElasticsearch.REL_FAILURE, 1);
|
||||
}
|
||||
|
||||
@Test(expected = ProcessException.class)
|
||||
public void testCreateElasticsearchClientWithException() throws ProcessException {
|
||||
FetchElasticsearchTestProcessor processor = new FetchElasticsearchTestProcessor(true) {
|
||||
@Override
|
||||
protected TransportClient getTransportClient(Settings.Builder settingsBuilder, String shieldUrl,
|
||||
String username, String password)
|
||||
throws MalformedURLException {
|
||||
throw new MalformedURLException();
|
||||
}
|
||||
};
|
||||
|
||||
MockProcessContext context = new MockProcessContext(processor);
|
||||
processor.initialize(new MockProcessorInitializationContext(processor, context));
|
||||
processor.callCreateElasticsearchClient(context);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetupSecureClient() throws Exception {
|
||||
FetchElasticsearchTestProcessor processor = new FetchElasticsearchTestProcessor(true);
|
||||
runner = TestRunners.newTestRunner(processor);
|
||||
SSLContextService sslService = mock(SSLContextService.class);
|
||||
when(sslService.getIdentifier()).thenReturn("ssl-context");
|
||||
runner.addControllerService("ssl-context", sslService);
|
||||
runner.enableControllerService(sslService);
|
||||
runner.setProperty(FetchElasticsearch.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(FetchElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(FetchElasticsearch.TYPE, "status");
|
||||
runner.setValidateExpressionUsage(true);
|
||||
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
|
||||
|
||||
// Allow time for the controller service to fully initialize
|
||||
Thread.sleep(500);
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A Test class that extends the processor in order to inject/mock behavior
|
||||
*/
|
||||
private static class FetchElasticsearchTestProcessor extends FetchElasticsearch {
|
||||
boolean documentExists = true;
|
||||
Exception exceptionToThrow = null;
|
||||
|
||||
public FetchElasticsearchTestProcessor(boolean documentExists) {
|
||||
this.documentExists = documentExists;
|
||||
}
|
||||
|
||||
public void setExceptionToThrow(Exception exceptionToThrow) {
|
||||
this.exceptionToThrow = exceptionToThrow;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TransportClient getTransportClient(Settings.Builder settingsBuilder, String shieldUrl,
|
||||
String username, String password)
|
||||
throws MalformedURLException {
|
||||
TransportClient mockClient = mock(TransportClient.class);
|
||||
GetRequestBuilder getRequestBuilder = spy(new GetRequestBuilder(mockClient, GetAction.INSTANCE));
|
||||
if (exceptionToThrow != null) {
|
||||
doThrow(exceptionToThrow).when(getRequestBuilder).execute();
|
||||
} else {
|
||||
doReturn(new MockGetRequestBuilderExecutor(documentExists)).when(getRequestBuilder).execute();
|
||||
}
|
||||
when(mockClient.prepareGet(anyString(), anyString(), anyString())).thenReturn(getRequestBuilder);
|
||||
|
||||
return mockClient;
|
||||
}
|
||||
|
||||
public void callCreateElasticsearchClient(ProcessContext context) {
|
||||
createElasticsearchClient(context);
|
||||
}
|
||||
|
||||
private static class MockGetRequestBuilderExecutor
|
||||
extends AdapterActionFuture<GetResponse, ActionListener<GetResponse>>
|
||||
implements ListenableActionFuture<GetResponse> {
|
||||
|
||||
boolean documentExists = true;
|
||||
|
||||
public MockGetRequestBuilderExecutor(boolean documentExists) {
|
||||
this.documentExists = documentExists;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected GetResponse convert(ActionListener<GetResponse> bulkResponseActionListener) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(ActionListener<GetResponse> actionListener) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetResponse get() throws InterruptedException, ExecutionException {
|
||||
GetResponse response = mock(GetResponse.class);
|
||||
when(response.isExists()).thenReturn(documentExists);
|
||||
when(response.getSourceAsBytes()).thenReturn("Success".getBytes());
|
||||
when(response.getSourceAsString()).thenReturn("Success");
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetResponse actionGet() {
|
||||
try {
|
||||
return get();
|
||||
} catch (Exception e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Integration test section below
|
||||
//
|
||||
// The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution.
|
||||
// However if you wish to execute them as part of a test phase, comment out the @Ignored line for each
|
||||
// desired test.
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Tests basic ES functionality against a local or test ES cluster
|
||||
*/
|
||||
@Test
|
||||
@Ignore("Comment this out if you want to run against local or test ES")
|
||||
public void testFetchElasticsearchBasic() {
|
||||
System.out.println("Starting test " + new Object() {
|
||||
}.getClass().getEnclosingMethod().getName());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearch());
|
||||
runner.setValidateExpressionUsage(true);
|
||||
|
||||
//Local Cluster - Mac pulled from brew
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
|
||||
|
||||
runner.setProperty(FetchElasticsearch.INDEX, "doc");
|
||||
|
||||
runner.setProperty(FetchElasticsearch.TYPE, "status");
|
||||
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
|
||||
|
||||
runner.enqueue(docExample);
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Comment this out if you want to run against local or test ES")
|
||||
public void testFetchElasticsearchBatch() throws IOException {
|
||||
System.out.println("Starting test " + new Object() {
|
||||
}.getClass().getEnclosingMethod().getName());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearch());
|
||||
runner.setValidateExpressionUsage(true);
|
||||
|
||||
//Local Cluster - Mac pulled from brew
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(FetchElasticsearch.INDEX, "doc");
|
||||
|
||||
runner.setProperty(FetchElasticsearch.TYPE, "status");
|
||||
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
|
||||
runner.assertValid();
|
||||
|
||||
|
||||
String message = convertStreamToString(docExample);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
|
||||
long newId = 28039652140L + i;
|
||||
final String newStrId = Long.toString(newId);
|
||||
runner.enqueue(message.getBytes(), new HashMap<String, String>() {{
|
||||
put("doc_id", newStrId);
|
||||
}});
|
||||
|
||||
}
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 100);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert an input stream to a stream
|
||||
*
|
||||
* @param is input the input stream
|
||||
* @return return the converted input stream as a string
|
||||
*/
|
||||
static String convertStreamToString(InputStream is) {
|
||||
java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
|
||||
return s.hasNext() ? s.next() : "";
|
||||
}
|
||||
}
|
|
@ -16,22 +16,27 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.elasticsearch;
|
||||
|
||||
import com.google.gson.JsonObject;
|
||||
import com.google.gson.JsonParser;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ListenableActionFuture;
|
||||
import org.elasticsearch.action.bulk.BulkAction;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexAction;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.support.AdapterActionFuture;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
|
@ -46,21 +51,20 @@ import java.util.concurrent.ExecutionException;
|
|||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestPutElasticsearch {
|
||||
|
||||
private InputStream twitterExample;
|
||||
private InputStream docExample;
|
||||
private TestRunner runner;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
|
||||
twitterExample = classloader
|
||||
.getResourceAsStream("TweetExample.json");
|
||||
|
||||
docExample = classloader.getResourceAsStream("DocumentExample.json");
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -70,105 +74,151 @@ public class TestPutElasticsearch {
|
|||
|
||||
@Test
|
||||
public void testPutElasticSearchOnTrigger() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new ElasticsearchTestProcessor(false)); // no failures
|
||||
runner.setValidateExpressionUsage(false);
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
|
||||
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
|
||||
runner.setValidateExpressionUsage(true);
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
|
||||
|
||||
runner.setProperty(PutElasticsearch.INDEX, "tweet");
|
||||
runner.setProperty(PutElasticsearch.INDEX, "doc");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(PutElasticsearch.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(twitterExample, new HashMap<String, String>() {{
|
||||
put("tweet_id", "28039652140");
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
|
||||
assertNotNull(out);
|
||||
out.assertAttributeEquals("tweet_id", "28039652140");
|
||||
out.assertAttributeEquals("doc_id", "28039652140");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutElasticSearchOnTriggerWithFailures() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new ElasticsearchTestProcessor(true)); // simulate failures
|
||||
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures
|
||||
runner.setValidateExpressionUsage(false);
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(PutElasticsearch.INDEX, "tweet");
|
||||
runner.setProperty(PutElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearch.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
|
||||
|
||||
runner.enqueue(twitterExample, new HashMap<String, String>() {{
|
||||
put("tweet_id", "28039652140");
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0);
|
||||
assertNotNull(out);
|
||||
out.assertAttributeEquals("tweet_id", "28039652140");
|
||||
out.assertAttributeEquals("doc_id", "28039652140");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutElasticSearchOnTriggerNode() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new ElasticsearchTestProcessor(false)); // no failures
|
||||
runner.setValidateExpressionUsage(false);
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE,"node");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
|
||||
public void testPutElasticsearchOnTriggerWithExceptions() throws IOException {
|
||||
PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(false);
|
||||
runner = TestRunners.newTestRunner(processor);
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
|
||||
|
||||
runner.setProperty(PutElasticsearch.INDEX, "tweet");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(PutElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearch.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
|
||||
runner.assertValid();
|
||||
runner.setValidateExpressionUsage(true);
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
|
||||
|
||||
runner.enqueue(twitterExample, new HashMap<String, String>() {{
|
||||
put("tweet_id", "28039652141");
|
||||
// No Node Available exception
|
||||
processor.setExceptionToThrow(new NoNodeAvailableException("test"));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
|
||||
assertNotNull(out);
|
||||
out.assertAttributeEquals("tweet_id", "28039652141");
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Elasticsearch Timeout exception
|
||||
processor.setExceptionToThrow(new ElasticsearchTimeoutException("test"));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652141");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Receive Timeout Transport exception
|
||||
processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class)));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652142");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Node Closed exception
|
||||
processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class)));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652143");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Elasticsearch Parse exception
|
||||
processor.setExceptionToThrow(new ElasticsearchParseException("test"));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652144");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
// This test generates an exception on execute(),routes to failure
|
||||
runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A Test class that extends the processor in order to inject/mock behavior
|
||||
*/
|
||||
private static class ElasticsearchTestProcessor extends PutElasticsearch {
|
||||
private static class PutElasticsearchTestProcessor extends PutElasticsearch {
|
||||
boolean responseHasFailures = false;
|
||||
Exception exceptionToThrow = null;
|
||||
|
||||
public ElasticsearchTestProcessor(boolean responseHasFailures) {
|
||||
public PutElasticsearchTestProcessor(boolean responseHasFailures) {
|
||||
this.responseHasFailures = responseHasFailures;
|
||||
}
|
||||
|
||||
@Override
|
||||
@OnScheduled
|
||||
public void createClient(ProcessContext context) throws IOException {
|
||||
esClient = mock(Client.class);
|
||||
BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(esClient, BulkAction.INSTANCE));
|
||||
doReturn(new MockBulkRequestBuilderExecutor(responseHasFailures)).when(bulkRequestBuilder).execute();
|
||||
when(esClient.prepareBulk()).thenReturn(bulkRequestBuilder);
|
||||
public void setExceptionToThrow(Exception exceptionToThrow) {
|
||||
this.exceptionToThrow = exceptionToThrow;
|
||||
}
|
||||
|
||||
IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(esClient, IndexAction.INSTANCE);
|
||||
when(esClient.prepareIndex(anyString(), anyString(), anyString())).thenReturn(indexRequestBuilder);
|
||||
@Override
|
||||
public void createElasticsearchClient(ProcessContext context) throws ProcessException {
|
||||
Client mockClient = mock(Client.class);
|
||||
BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(mockClient, BulkAction.INSTANCE));
|
||||
if (exceptionToThrow != null) {
|
||||
doThrow(exceptionToThrow).when(bulkRequestBuilder).execute();
|
||||
} else {
|
||||
doReturn(new MockBulkRequestBuilderExecutor(responseHasFailures)).when(bulkRequestBuilder).execute();
|
||||
}
|
||||
when(mockClient.prepareBulk()).thenReturn(bulkRequestBuilder);
|
||||
|
||||
IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(mockClient, IndexAction.INSTANCE);
|
||||
when(mockClient.prepareIndex(anyString(), anyString(), anyString())).thenReturn(indexRequestBuilder);
|
||||
|
||||
esClient.set(mockClient);
|
||||
}
|
||||
|
||||
private static class MockBulkRequestBuilderExecutor
|
||||
|
@ -195,6 +245,10 @@ public class TestPutElasticsearch {
|
|||
public BulkResponse get() throws InterruptedException, ExecutionException {
|
||||
BulkResponse response = mock(BulkResponse.class);
|
||||
when(response.hasFailures()).thenReturn(responseHasFailures);
|
||||
BulkItemResponse item = mock(BulkItemResponse.class);
|
||||
when(item.getItemId()).thenReturn(1);
|
||||
when(item.isFailed()).thenReturn(true);
|
||||
when(response.getItems()).thenReturn(new BulkItemResponse[]{item});
|
||||
return response;
|
||||
}
|
||||
|
||||
|
@ -212,72 +266,37 @@ public class TestPutElasticsearch {
|
|||
|
||||
/**
|
||||
* Tests basic ES functionality against a local or test ES cluster
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
@Ignore("Comment this out if you want to run against local or test ES")
|
||||
public void testPutElasticSearchBasic() throws IOException {
|
||||
public void testPutElasticSearchBasic() {
|
||||
System.out.println("Starting test " + new Object() {
|
||||
}.getClass().getEnclosingMethod().getName());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch());
|
||||
runner.setValidateExpressionUsage(false);
|
||||
|
||||
//Local Cluster - Mac pulled from brew
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "transport");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
|
||||
|
||||
runner.setProperty(PutElasticsearch.INDEX, "tweet");
|
||||
runner.setProperty(PutElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
|
||||
|
||||
runner.setProperty(PutElasticsearch.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(twitterExample, new HashMap<String, String>() {{
|
||||
put("tweet_id", "28039652140");
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
|
||||
|
||||
runner.enqueue(twitterExample);
|
||||
runner.enqueue(docExample);
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Comment this out if you want to run against local or test ES")
|
||||
public void testPutElasticSearchBasicNode() throws IOException {
|
||||
System.out.println("Starting test " + new Object() {
|
||||
}.getClass().getEnclosingMethod().getName());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch());
|
||||
runner.setValidateExpressionUsage(false);
|
||||
|
||||
//Local Cluster - Mac pulled from brew
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "node");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.PATH_HOME, "/usr/local/opt/elasticsearch");
|
||||
runner.setProperty(PutElasticsearch.INDEX, "tweet");
|
||||
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
|
||||
|
||||
runner.setProperty(PutElasticsearch.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(twitterExample, new HashMap<String, String>() {{
|
||||
put("tweet_id", "28039652141");
|
||||
}});
|
||||
|
||||
runner.enqueue(twitterExample);
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -289,31 +308,25 @@ public class TestPutElasticsearch {
|
|||
runner.setValidateExpressionUsage(false);
|
||||
|
||||
//Local Cluster - Mac pulled from brew
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLIENT_TYPE, "transport");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(PutElasticsearch.INDEX, "tweet");
|
||||
runner.setProperty(PutElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearch.BATCH_SIZE, "100");
|
||||
|
||||
runner.setProperty(PutElasticsearch.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "tweet_id");
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
|
||||
runner.assertValid();
|
||||
|
||||
|
||||
JsonParser parser = new JsonParser();
|
||||
JsonObject json;
|
||||
String message = convertStreamToString(twitterExample);
|
||||
String message = convertStreamToString(docExample);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
|
||||
json = parser.parse(message).getAsJsonObject();
|
||||
String id = json.get("id").getAsString();
|
||||
long newId = Long.parseLong(id) + i;
|
||||
long newId = 28039652140L + i;
|
||||
final String newStrId = Long.toString(newId);
|
||||
//json.addProperty("id", newId);
|
||||
runner.enqueue(message.getBytes(), new HashMap<String, String>() {{
|
||||
put("tweet_id", newStrId);
|
||||
put("doc_id", newStrId);
|
||||
}});
|
||||
|
||||
}
|
||||
|
@ -321,8 +334,6 @@ public class TestPutElasticsearch {
|
|||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 100);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
{
|
||||
"created_at": "Thu Jan 21 16:02:46 +0000 2016",
|
||||
"text": "This is a test document from a mock social media service",
|
||||
"contributors": null,
|
||||
"id": 28039652140,
|
||||
"shares": null,
|
||||
"geographic_location": null,
|
||||
"userinfo": {
|
||||
"name": "Not A. Person",
|
||||
"location": "Orlando, FL",
|
||||
"created_at": "Fri Oct 24 23:22:09 +0000 2008",
|
||||
"follow_count": 1,
|
||||
"url": "http://not.a.real.site",
|
||||
"id": 16958875,
|
||||
"lang": "en",
|
||||
"time_zone": "Mountain Time (US & Canada)",
|
||||
"description": "I'm a test person.",
|
||||
"following_count": 71,
|
||||
"screen_name": "Nobody"
|
||||
}
|
||||
}
|
|
@ -1,83 +0,0 @@
|
|||
|
||||
{
|
||||
"coordinates": null,
|
||||
"created_at": "Thu Oct 21 16:02:46 +0000 2010",
|
||||
"favorited": false,
|
||||
"truncated": false,
|
||||
"id_str": "28039652140",
|
||||
"entities": {
|
||||
"urls": [
|
||||
{
|
||||
"expanded_url": null,
|
||||
"url": "http://gnip.com/success_stories",
|
||||
"indices": [
|
||||
69,
|
||||
100
|
||||
]
|
||||
}
|
||||
],
|
||||
"hashtags": [
|
||||
|
||||
],
|
||||
"user_mentions": [
|
||||
{
|
||||
"name": "Gnip, Inc.",
|
||||
"id_str": "16958875",
|
||||
"id": 16958875,
|
||||
"indices": [
|
||||
25,
|
||||
30
|
||||
],
|
||||
"screen_name": "gnip"
|
||||
}
|
||||
]
|
||||
},
|
||||
"in_reply_to_user_id_str": null,
|
||||
"text": "what we've been up to at @gnip -- delivering data to happy customers http://gnip.com/success_stories",
|
||||
"contributors": null,
|
||||
"id": 28039652140,
|
||||
"retweet_count": null,
|
||||
"in_reply_to_status_id_str": null,
|
||||
"geo": null,
|
||||
"retweeted": false,
|
||||
"in_reply_to_user_id": null,
|
||||
"user": {
|
||||
"profile_sidebar_border_color": "C0DEED",
|
||||
"name": "Gnip, Inc.",
|
||||
"profile_sidebar_fill_color": "DDEEF6",
|
||||
"profile_background_tile": false,
|
||||
"profile_image_url": "http://a3.twimg.com/profile_images/62803643/icon_normal.png",
|
||||
"location": "Boulder, CO",
|
||||
"created_at": "Fri Oct 24 23:22:09 +0000 2008",
|
||||
"id_str": "16958875",
|
||||
"follow_request_sent": false,
|
||||
"profile_link_color": "0084B4",
|
||||
"favourites_count": 1,
|
||||
"url": "http://blog.gnip.com",
|
||||
"contributors_enabled": false,
|
||||
"utc_offset": -25200,
|
||||
"id": 16958875,
|
||||
"profile_use_background_image": true,
|
||||
"listed_count": 23,
|
||||
"protected": false,
|
||||
"lang": "en",
|
||||
"profile_text_color": "333333",
|
||||
"followers_count": 260,
|
||||
"time_zone": "Mountain Time (US & Canada)",
|
||||
"verified": false,
|
||||
"geo_enabled": true,
|
||||
"profile_background_color": "C0DEED",
|
||||
"notifications": false,
|
||||
"description": "Gnip makes it really easy for you to collect social data for your business.",
|
||||
"friends_count": 71,
|
||||
"profile_background_image_url": "http://s.twimg.com/a/1287010001/images/themes/theme1/bg.png",
|
||||
"statuses_count": 302,
|
||||
"screen_name": "gnip",
|
||||
"following": false,
|
||||
"show_all_inline_media": false
|
||||
},
|
||||
"in_reply_to_screen_name": null,
|
||||
"source": "web",
|
||||
"place": null,
|
||||
"in_reply_to_status_id": null
|
||||
}
|
|
@ -1,4 +1,14 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
|
||||
license agreements. See the NOTICE file distributed with this work for additional
|
||||
information regarding copyright ownership. The ASF licenses this file to
|
||||
You under the Apache License, Version 2.0 (the "License"); you may not use
|
||||
this file except in compliance with the License. You may obtain a copy of
|
||||
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
|
||||
by applicable law or agreed to in writing, software distributed under the
|
||||
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
|
||||
OF ANY KIND, either express or implied. See the License for the specific
|
||||
language governing permissions and limitations under the License. -->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
|
@ -23,4 +33,14 @@
|
|||
<module>nifi-elasticsearch-processors</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-elasticsearch-processors</artifactId>
|
||||
<version>0.4.2-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
</project>
|
|
@ -52,6 +52,7 @@
|
|||
<module>nifi-riemann-bundle</module>
|
||||
<module>nifi-html-bundle</module>
|
||||
<module>nifi-scripting-bundle</module>
|
||||
<module>nifi-elasticsearch-bundle</module>
|
||||
</modules>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
|
|
Loading…
Reference in New Issue