mirror of https://github.com/apache/nifi.git
NIFI-9600 Removed Elasticsearch 2 Processors
Signed-off-by: Joe Gresock <jgresock@gmail.com> This closes #5685.
This commit is contained in:
parent
fd1c1e354e
commit
a4791a55d4
|
@ -23,7 +23,6 @@ language governing permissions and limitations under the License. -->
|
|||
<properties>
|
||||
<maven.javadoc.skip>true</maven.javadoc.skip>
|
||||
<source.skip>true</source.skip>
|
||||
<lucene.version>5.3.1</lucene.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
@ -38,5 +37,4 @@ language governing permissions and limitations under the License. -->
|
|||
<artifactId>nifi-elasticsearch-processors</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -207,33 +207,3 @@ The Apache NiFi project contains subcomponents with separate copyright
|
|||
notices and license terms. Your use of the source code for the these
|
||||
subcomponents is subject to the terms and conditions of the following
|
||||
licenses.
|
||||
|
||||
The binary distribution of this product bundles 'HdrHistogram' which is available under a 2-Clause BSD style license:
|
||||
|
||||
Copyright (c) 2012, 2013, 2014 Gil Tene
|
||||
Copyright (c) 2014 Michael Barker
|
||||
Copyright (c) 2014 Matt Warren
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
|
||||
1. Redistributions of source code must retain the above copyright notice,
|
||||
this list of conditions and the following disclaimer.
|
||||
|
||||
2. Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
||||
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
|
||||
THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
|
|
|
@ -10,238 +10,11 @@ Apache Software License v2
|
|||
|
||||
The following binary components are provided under the Apache Software License v2
|
||||
|
||||
(ASLv2) Elasticsearch
|
||||
The following NOTICE information applies:
|
||||
Elasticsearch
|
||||
Copyright 2009-2015 Elasticsearch
|
||||
|
||||
(ASLv2) Apache Commons IO
|
||||
The following NOTICE information applies:
|
||||
Apache Commons IO
|
||||
Copyright 2002-2016 The Apache Software Foundation
|
||||
|
||||
(ASLv2) Apache Lucene
|
||||
The following NOTICE information applies:
|
||||
Apache Lucene
|
||||
Copyright 2014 The Apache Software Foundation
|
||||
|
||||
Includes software from other Apache Software Foundation projects,
|
||||
including, but not limited to:
|
||||
- Apache Ant
|
||||
- Apache Jakarta Regexp
|
||||
- Apache Commons
|
||||
- Apache Xerces
|
||||
|
||||
ICU4J, (under analysis/icu) is licensed under an MIT styles license
|
||||
and Copyright (c) 1995-2008 International Business Machines Corporation and others
|
||||
|
||||
Some data files (under analysis/icu/src/data) are derived from Unicode data such
|
||||
as the Unicode Character Database. See http://unicode.org/copyright.html for more
|
||||
details.
|
||||
|
||||
Brics Automaton (under core/src/java/org/apache/lucene/util/automaton) is
|
||||
BSD-licensed, created by Anders Møller. See http://www.brics.dk/automaton/
|
||||
|
||||
The levenshtein automata tables (under core/src/java/org/apache/lucene/util/automaton) were
|
||||
automatically generated with the moman/finenight FSA library, created by
|
||||
Jean-Philippe Barrette-LaPierre. This library is available under an MIT license,
|
||||
see http://sites.google.com/site/rrettesite/moman and
|
||||
http://bitbucket.org/jpbarrette/moman/overview/
|
||||
|
||||
The class org.apache.lucene.util.WeakIdentityMap was derived from
|
||||
the Apache CXF project and is Apache License 2.0.
|
||||
|
||||
The Google Code Prettify is Apache License 2.0.
|
||||
See http://code.google.com/p/google-code-prettify/
|
||||
|
||||
JUnit (junit-4.10) is licensed under the Common Public License v. 1.0
|
||||
See http://junit.sourceforge.net/cpl-v10.html
|
||||
|
||||
This product includes code (JaspellTernarySearchTrie) from Java Spelling Checkin
|
||||
g Package (jaspell): http://jaspell.sourceforge.net/
|
||||
License: The BSD License (http://www.opensource.org/licenses/bsd-license.php)
|
||||
|
||||
The snowball stemmers in
|
||||
analysis/common/src/java/net/sf/snowball
|
||||
were developed by Martin Porter and Richard Boulton.
|
||||
The snowball stopword lists in
|
||||
analysis/common/src/resources/org/apache/lucene/analysis/snowball
|
||||
were developed by Martin Porter and Richard Boulton.
|
||||
The full snowball package is available from
|
||||
http://snowball.tartarus.org/
|
||||
|
||||
The KStem stemmer in
|
||||
analysis/common/src/org/apache/lucene/analysis/en
|
||||
was developed by Bob Krovetz and Sergio Guzman-Lara (CIIR-UMass Amherst)
|
||||
under the BSD-license.
|
||||
|
||||
The Arabic,Persian,Romanian,Bulgarian, and Hindi analyzers (common) come with a default
|
||||
stopword list that is BSD-licensed created by Jacques Savoy. These files reside in:
|
||||
analysis/common/src/resources/org/apache/lucene/analysis/ar/stopwords.txt,
|
||||
analysis/common/src/resources/org/apache/lucene/analysis/fa/stopwords.txt,
|
||||
analysis/common/src/resources/org/apache/lucene/analysis/ro/stopwords.txt,
|
||||
analysis/common/src/resources/org/apache/lucene/analysis/bg/stopwords.txt,
|
||||
analysis/common/src/resources/org/apache/lucene/analysis/hi/stopwords.txt
|
||||
See http://members.unine.ch/jacques.savoy/clef/index.html.
|
||||
|
||||
The German,Spanish,Finnish,French,Hungarian,Italian,Portuguese,Russian and Swedish light stemmers
|
||||
(common) are based on BSD-licensed reference implementations created by Jacques Savoy and
|
||||
Ljiljana Dolamic. These files reside in:
|
||||
analysis/common/src/java/org/apache/lucene/analysis/de/GermanLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/de/GermanMinimalStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/es/SpanishLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/fi/FinnishLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/fr/FrenchLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/fr/FrenchMinimalStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/hu/HungarianLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/it/ItalianLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/pt/PortugueseLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/ru/RussianLightStemmer.java
|
||||
analysis/common/src/java/org/apache/lucene/analysis/sv/SwedishLightStemmer.java
|
||||
|
||||
The Stempel analyzer (stempel) includes BSD-licensed software developed
|
||||
by the Egothor project http://egothor.sf.net/, created by Leo Galambos, Martin Kvapil,
|
||||
and Edmond Nolan.
|
||||
|
||||
The Polish analyzer (stempel) comes with a default
|
||||
stopword list that is BSD-licensed created by the Carrot2 project. The file resides
|
||||
in stempel/src/resources/org/apache/lucene/analysis/pl/stopwords.txt.
|
||||
See http://project.carrot2.org/license.html.
|
||||
|
||||
The SmartChineseAnalyzer source code (smartcn) was
|
||||
provided by Xiaoping Gao and copyright 2009 by www.imdict.net.
|
||||
|
||||
WordBreakTestUnicode_*.java (under modules/analysis/common/src/test/)
|
||||
is derived from Unicode data such as the Unicode Character Database.
|
||||
See http://unicode.org/copyright.html for more details.
|
||||
|
||||
The Morfologik analyzer (morfologik) includes BSD-licensed software
|
||||
developed by Dawid Weiss and Marcin Miłkowski (http://morfologik.blogspot.com/).
|
||||
|
||||
Morfologik uses data from Polish ispell/myspell dictionary
|
||||
(http://www.sjp.pl/slownik/en/) licenced on the terms of (inter alia)
|
||||
LGPL and Creative Commons ShareAlike.
|
||||
|
||||
Morfologic includes data from BSD-licensed dictionary of Polish (SGJP)
|
||||
(http://sgjp.pl/morfeusz/)
|
||||
|
||||
Servlet-api.jar and javax.servlet-*.jar are under the CDDL license, the original
|
||||
source code for this can be found at http://www.eclipse.org/jetty/downloads.php
|
||||
|
||||
===========================================================================
|
||||
Kuromoji Japanese Morphological Analyzer - Apache Lucene Integration
|
||||
===========================================================================
|
||||
|
||||
This software includes a binary and/or source version of data from
|
||||
|
||||
mecab-ipadic-2.7.0-20070801
|
||||
|
||||
which can be obtained from
|
||||
|
||||
http://atilika.com/releases/mecab-ipadic/mecab-ipadic-2.7.0-20070801.tar.gz
|
||||
|
||||
or
|
||||
|
||||
http://jaist.dl.sourceforge.net/project/mecab/mecab-ipadic/2.7.0-20070801/mecab-ipadic-2.7.0-20070801.tar.gz
|
||||
|
||||
===========================================================================
|
||||
mecab-ipadic-2.7.0-20070801 Notice
|
||||
===========================================================================
|
||||
|
||||
Nara Institute of Science and Technology (NAIST),
|
||||
the copyright holders, disclaims all warranties with regard to this
|
||||
software, including all implied warranties of merchantability and
|
||||
fitness, in no event shall NAIST be liable for
|
||||
any special, indirect or consequential damages or any damages
|
||||
whatsoever resulting from loss of use, data or profits, whether in an
|
||||
action of contract, negligence or other tortuous action, arising out
|
||||
of or in connection with the use or performance of this software.
|
||||
|
||||
A large portion of the dictionary entries
|
||||
originate from ICOT Free Software. The following conditions for ICOT
|
||||
Free Software applies to the current dictionary as well.
|
||||
|
||||
Each User may also freely distribute the Program, whether in its
|
||||
original form or modified, to any third party or parties, PROVIDED
|
||||
that the provisions of Section 3 ("NO WARRANTY") will ALWAYS appear
|
||||
on, or be attached to, the Program, which is distributed substantially
|
||||
in the same form as set out herein and that such intended
|
||||
distribution, if actually made, will neither violate or otherwise
|
||||
contravene any of the laws and regulations of the countries having
|
||||
jurisdiction over the User or the intended distribution itself.
|
||||
|
||||
NO WARRANTY
|
||||
|
||||
The program was produced on an experimental basis in the course of the
|
||||
research and development conducted during the project and is provided
|
||||
to users as so produced on an experimental basis. Accordingly, the
|
||||
program is provided without any warranty whatsoever, whether express,
|
||||
implied, statutory or otherwise. The term "warranty" used herein
|
||||
includes, but is not limited to, any warranty of the quality,
|
||||
performance, merchantability and fitness for a particular purpose of
|
||||
the program and the nonexistence of any infringement or violation of
|
||||
any right of any third party.
|
||||
|
||||
Each user of the program will agree and understand, and be deemed to
|
||||
have agreed and understood, that there is no warranty whatsoever for
|
||||
the program and, accordingly, the entire risk arising from or
|
||||
otherwise connected with the program is assumed by the user.
|
||||
|
||||
Therefore, neither ICOT, the copyright holder, or any other
|
||||
organization that participated in or was otherwise related to the
|
||||
development of the program and their respective officials, directors,
|
||||
officers and other employees shall be held liable for any and all
|
||||
damages, including, without limitation, general, special, incidental
|
||||
and consequential damages, arising out of or otherwise in connection
|
||||
with the use or inability to use the program or any product, material
|
||||
or result produced or otherwise obtained by using the program,
|
||||
regardless of whether they have been advised of, or otherwise had
|
||||
knowledge of, the possibility of such damages at any time during the
|
||||
project or thereafter. Each user will be deemed to have agreed to the
|
||||
foregoing by his or her commencement of use of the program. The term
|
||||
"use" as used herein includes, but is not limited to, the use,
|
||||
modification, copying and distribution of the program and the
|
||||
production of secondary products from the program.
|
||||
|
||||
In the case where the program, whether in its original form or
|
||||
modified, was distributed or delivered to or received by a user from
|
||||
any person, organization or entity other than ICOT, unless it makes or
|
||||
grants independently of ICOT any specific warranty to the user in
|
||||
writing, such person, organization or entity, will also be exempted
|
||||
from and not be held liable to the user for any such damages as noted
|
||||
above as far as the program is concerned.
|
||||
|
||||
(ASLv2) Carrotsearch HPPC
|
||||
The following NOTICE information applies:
|
||||
HPPC borrowed code, ideas or both from:
|
||||
|
||||
* Apache Lucene, http://lucene.apache.org/
|
||||
(Apache license)
|
||||
* Fastutil, http://fastutil.di.unimi.it/
|
||||
(Apache license)
|
||||
* Koloboke, https://github.com/OpenHFT/Koloboke
|
||||
(Apache license)
|
||||
|
||||
(ASLv2) Joda Time
|
||||
The following NOTICE information applies:
|
||||
This product includes software developed by
|
||||
Joda.org (http://www.joda.org/).
|
||||
|
||||
(ASLv2) The Netty Project
|
||||
The following NOTICE information applies:
|
||||
The Netty Project
|
||||
Copyright 2011 The Netty Project
|
||||
|
||||
(ASLv2) t-digest
|
||||
The following NOTICE information applies:
|
||||
The code for the t-digest was originally authored by Ted Dunning
|
||||
A number of small but very helpful changes have been contributed by Adrien Grand (https://github.com/jpountz)
|
||||
|
||||
(ASLv2) Apache Commons-CLI
|
||||
The following NOTICE information applies:
|
||||
Apache Commons CLI
|
||||
Copyright 2001-2016 The Apache Software Foundation
|
||||
|
||||
(ASLv2) Jackson JSON processor
|
||||
The following NOTICE information applies:
|
||||
# Jackson JSON processor
|
||||
|
@ -264,12 +37,3 @@ The following binary components are provided under the Apache Software License v
|
|||
A list of contributors may be found from CREDITS file, which is included
|
||||
in some artifacts (usually source distributions); but is always available
|
||||
from the source code management (SCM) system project uses.
|
||||
|
||||
*****************
|
||||
Public Domain
|
||||
*****************
|
||||
|
||||
The following binary components are provided under the Creative Commons Zero license version 1.0. See project link for details.
|
||||
|
||||
(CC0v1.0) JSR166e for Twitter (com.twitter:jsr166e:jar:1.1.0 - https://github.com/twitter/jsr166e)
|
||||
|
||||
|
|
|
@ -20,12 +20,6 @@ language governing permissions and limitations under the License. -->
|
|||
<artifactId>nifi-elasticsearch-processors</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<slf4jversion>1.7.12</slf4jversion>
|
||||
<es.version>2.1.0</es.version>
|
||||
<lucene.version>5.3.1</lucene.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
|
@ -64,11 +58,6 @@ language governing permissions and limitations under the License. -->
|
|||
<artifactId>commons-text</artifactId>
|
||||
<version>1.8</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.lucene</groupId>
|
||||
<artifactId>lucene-core</artifactId>
|
||||
<version>${lucene.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
|
@ -93,17 +82,6 @@ language governing permissions and limitations under the License. -->
|
|||
<version>1.16.0-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.elasticsearch</groupId>
|
||||
<artifactId>elasticsearch</artifactId>
|
||||
<version>${es.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.squareup.okhttp3</groupId>
|
||||
<artifactId>okhttp</artifactId>
|
||||
|
@ -138,12 +116,6 @@ language governing permissions and limitations under the License. -->
|
|||
<artifactId>nifi-standard-record-utils</artifactId>
|
||||
<version>1.16.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<!-- Override Netty 3 -->
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty</artifactId>
|
||||
<version>${netty.3.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
|
|
@ -1,266 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.elasticsearch;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.resource.ResourceCardinality;
|
||||
import org.apache.nifi.components.resource.ResourceType;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public abstract class AbstractElasticsearchTransportClientProcessor extends AbstractElasticsearchProcessor {
|
||||
|
||||
protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder()
|
||||
.name("Cluster Name")
|
||||
.description("Name of the ES cluster (for example, elasticsearch_brew). Defaults to 'elasticsearch'")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.defaultValue("elasticsearch")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder()
|
||||
.name("ElasticSearch Hosts")
|
||||
.description("ElasticSearch Hosts, which should be comma separated and colon for hostname/port "
|
||||
+ "host1:port,host2:port,.... For example testcluster:9300. This processor uses the Transport Client to "
|
||||
+ "connect to hosts. The default transport client port is 9300.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PROP_SHIELD_LOCATION = new PropertyDescriptor.Builder()
|
||||
.name("Shield Plugin Filename")
|
||||
.description("Specifies the path to the JAR for the Elasticsearch Shield plugin. "
|
||||
+ "If the Elasticsearch cluster has been secured with the Shield plugin, then the Shield plugin "
|
||||
+ "JAR must also be available to this processor. Note: Do NOT place the Shield JAR into NiFi's "
|
||||
+ "lib/ directory, doing so will prevent the Shield plugin from being loaded.")
|
||||
.required(false)
|
||||
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE, ResourceType.DIRECTORY)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name("ElasticSearch Ping Timeout")
|
||||
.description("The ping timeout used to determine when a node is unreachable. " +
|
||||
"For example, 5s (5 seconds). If non-local recommended is 30s")
|
||||
.required(true)
|
||||
.defaultValue("5s")
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder()
|
||||
.name("Sampler Interval")
|
||||
.description("How often to sample / ping the nodes listed and connected. For example, 5s (5 seconds). "
|
||||
+ "If non-local recommended is 30s.")
|
||||
.required(true)
|
||||
.defaultValue("5s")
|
||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
protected AtomicReference<Client> esClient = new AtomicReference<>();
|
||||
protected List<InetSocketAddress> esHosts;
|
||||
protected String authToken;
|
||||
|
||||
/**
|
||||
* Instantiate ElasticSearch Client. This should be called by subclasses' @OnScheduled method to create a client
|
||||
* if one does not yet exist. If called when scheduled, closeClient() should be called by the subclasses' @OnStopped
|
||||
* method so the client will be destroyed when the processor is stopped.
|
||||
*
|
||||
* @param context The context for this processor
|
||||
* @throws ProcessException if an error occurs while creating an Elasticsearch client
|
||||
*/
|
||||
@Override
|
||||
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
|
||||
|
||||
ComponentLog log = getLogger();
|
||||
if (esClient.get() != null) {
|
||||
return;
|
||||
}
|
||||
|
||||
log.debug("Creating ElasticSearch Client");
|
||||
try {
|
||||
final String clusterName = context.getProperty(CLUSTER_NAME).evaluateAttributeExpressions().getValue();
|
||||
final String pingTimeout = context.getProperty(PING_TIMEOUT).evaluateAttributeExpressions().getValue();
|
||||
final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).evaluateAttributeExpressions().getValue();
|
||||
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
|
||||
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
|
||||
|
||||
final SSLContextService sslService =
|
||||
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
|
||||
Settings.Builder settingsBuilder = Settings.settingsBuilder()
|
||||
.put("cluster.name", clusterName)
|
||||
.put("client.transport.ping_timeout", pingTimeout)
|
||||
.put("client.transport.nodes_sampler_interval", samplerInterval);
|
||||
|
||||
String shieldUrl = context.getProperty(PROP_SHIELD_LOCATION).evaluateAttributeExpressions().getValue();
|
||||
if (sslService != null) {
|
||||
settingsBuilder.put("shield.transport.ssl", "true")
|
||||
.put("shield.ssl.keystore.path", sslService.getKeyStoreFile())
|
||||
.put("shield.ssl.keystore.password", sslService.getKeyStorePassword())
|
||||
.put("shield.ssl.truststore.path", sslService.getTrustStoreFile())
|
||||
.put("shield.ssl.truststore.password", sslService.getTrustStorePassword());
|
||||
}
|
||||
|
||||
// Set username and password for Shield
|
||||
if (!StringUtils.isEmpty(username)) {
|
||||
StringBuffer shieldUser = new StringBuffer(username);
|
||||
if (!StringUtils.isEmpty(password)) {
|
||||
shieldUser.append(":");
|
||||
shieldUser.append(password);
|
||||
}
|
||||
settingsBuilder.put("shield.user", shieldUser);
|
||||
|
||||
}
|
||||
|
||||
TransportClient transportClient = getTransportClient(settingsBuilder, shieldUrl, username, password);
|
||||
|
||||
final String hosts = context.getProperty(HOSTS).evaluateAttributeExpressions().getValue();
|
||||
esHosts = getEsHosts(hosts);
|
||||
|
||||
if (esHosts != null) {
|
||||
for (final InetSocketAddress host : esHosts) {
|
||||
try {
|
||||
transportClient.addTransportAddress(new InetSocketTransportAddress(host));
|
||||
} catch (IllegalArgumentException iae) {
|
||||
log.error("Could not add transport address {}", new Object[]{host});
|
||||
}
|
||||
}
|
||||
}
|
||||
esClient.set(transportClient);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to create Elasticsearch client due to {}", new Object[]{e}, e);
|
||||
throw new ProcessException(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected TransportClient getTransportClient(Settings.Builder settingsBuilder, String shieldUrl,
|
||||
String username, String password)
|
||||
throws MalformedURLException {
|
||||
|
||||
// Create new transport client using the Builder pattern
|
||||
TransportClient.Builder builder = TransportClient.builder();
|
||||
|
||||
// See if the Elasticsearch Shield JAR location was specified, and add the plugin if so. Also create the
|
||||
// authorization token if username and password are supplied.
|
||||
final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
if (!StringUtils.isBlank(shieldUrl)) {
|
||||
ClassLoader shieldClassLoader =
|
||||
new URLClassLoader(new URL[]{new File(shieldUrl).toURI().toURL()}, this.getClass().getClassLoader());
|
||||
Thread.currentThread().setContextClassLoader(shieldClassLoader);
|
||||
|
||||
try {
|
||||
Class shieldPluginClass = Class.forName("org.elasticsearch.shield.ShieldPlugin", true, shieldClassLoader);
|
||||
builder = builder.addPlugin(shieldPluginClass);
|
||||
|
||||
if (!StringUtils.isEmpty(username) && !StringUtils.isEmpty(password)) {
|
||||
|
||||
// Need a couple of classes from the Shield plugin to build the token
|
||||
Class usernamePasswordTokenClass =
|
||||
Class.forName("org.elasticsearch.shield.authc.support.UsernamePasswordToken", true, shieldClassLoader);
|
||||
|
||||
Class securedStringClass =
|
||||
Class.forName("org.elasticsearch.shield.authc.support.SecuredString", true, shieldClassLoader);
|
||||
|
||||
Constructor<?> securedStringCtor = securedStringClass.getConstructor(char[].class);
|
||||
Object securePasswordString = securedStringCtor.newInstance(password.toCharArray());
|
||||
|
||||
Method basicAuthHeaderValue = usernamePasswordTokenClass.getMethod("basicAuthHeaderValue", String.class, securedStringClass);
|
||||
authToken = (String) basicAuthHeaderValue.invoke(null, username, securePasswordString);
|
||||
}
|
||||
} catch (ClassNotFoundException
|
||||
| NoSuchMethodException
|
||||
| InstantiationException
|
||||
| IllegalAccessException
|
||||
| InvocationTargetException shieldLoadException) {
|
||||
getLogger().debug("Did not detect Elasticsearch Shield plugin, secure connections and/or authorization will not be available");
|
||||
}
|
||||
} else {
|
||||
getLogger().debug("No Shield plugin location specified, secure connections and/or authorization will not be available");
|
||||
}
|
||||
TransportClient transportClient = builder.settings(settingsBuilder.build()).build();
|
||||
Thread.currentThread().setContextClassLoader(originalClassLoader);
|
||||
return transportClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispose of ElasticSearch client
|
||||
*/
|
||||
public void closeClient() {
|
||||
if (esClient.get() != null) {
|
||||
getLogger().info("Closing ElasticSearch Client");
|
||||
esClient.get().close();
|
||||
esClient.set(null);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the ElasticSearch hosts from a NiFi attribute, e.g.
|
||||
*
|
||||
* @param hosts A comma-separated list of ElasticSearch hosts (host:port,host2:port2, etc.)
|
||||
* @return List of InetSocketAddresses for the ES hosts
|
||||
*/
|
||||
private List<InetSocketAddress> getEsHosts(String hosts) {
|
||||
|
||||
if (hosts == null) {
|
||||
return null;
|
||||
}
|
||||
final List<String> esList = Arrays.asList(hosts.split(","));
|
||||
List<InetSocketAddress> esHosts = new ArrayList<>();
|
||||
|
||||
for (String item : esList) {
|
||||
|
||||
String[] addresses = item.split(":");
|
||||
if (addresses.length != 2) {
|
||||
throw new ArrayIndexOutOfBoundsException("Not in host:port format");
|
||||
}
|
||||
final String hostName = addresses[0].trim();
|
||||
final int port = Integer.parseInt(addresses[1].trim());
|
||||
|
||||
esHosts.add(new InetSocketAddress(hostName, port));
|
||||
}
|
||||
return esHosts;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,231 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.elasticsearch;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.DeprecationNotice;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.get.GetRequestBuilder;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Deprecated
|
||||
@DeprecationNotice(classNames = {"org.apache.nifi.processors.elasticsearch.GetElasticsearch"},
|
||||
reason = "This processor is deprecated and may be removed in future releases.")
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
@EventDriven
|
||||
@SupportsBatching
|
||||
@Tags({"elasticsearch", "fetch", "read", "get"})
|
||||
@CapabilityDescription("Retrieves a document from Elasticsearch using the specified connection properties and the "
|
||||
+ "identifier of the document to retrieve. If the cluster has been configured for authorization and/or secure "
|
||||
+ "transport (SSL/TLS) and the Shield plugin is available, secure connections can be made. This processor "
|
||||
+ "supports Elasticsearch 2.x clusters.")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = "filename", description = "The filename attributes is set to the document identifier"),
|
||||
@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
|
||||
@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type")
|
||||
})
|
||||
public class FetchElasticsearch extends AbstractElasticsearchTransportClientProcessor {
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
||||
.description("All FlowFiles that are read from Elasticsearch are routed to this relationship").build();
|
||||
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
||||
.description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build();
|
||||
|
||||
public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
|
||||
.description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may succeed")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
|
||||
.description("A FlowFile is routed to this relationship if the specified document does not exist in the Elasticsearch cluster")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder()
|
||||
.name("Document Identifier")
|
||||
.description("The identifier for the document to be fetched")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
|
||||
.name("Index")
|
||||
.description("The name of the index to read from")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
|
||||
.name("Type")
|
||||
.description("The type of this document (used by Elasticsearch for indexing and searching)")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
|
||||
private static final Set<Relationship> relationships;
|
||||
private static final List<PropertyDescriptor> propertyDescriptors;
|
||||
|
||||
static {
|
||||
final Set<Relationship> _rels = new HashSet<>();
|
||||
_rels.add(REL_SUCCESS);
|
||||
_rels.add(REL_FAILURE);
|
||||
_rels.add(REL_RETRY);
|
||||
_rels.add(REL_NOT_FOUND);
|
||||
relationships = Collections.unmodifiableSet(_rels);
|
||||
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(CLUSTER_NAME);
|
||||
descriptors.add(HOSTS);
|
||||
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
|
||||
descriptors.add(PROP_SHIELD_LOCATION);
|
||||
descriptors.add(USERNAME);
|
||||
descriptors.add(PASSWORD);
|
||||
descriptors.add(PING_TIMEOUT);
|
||||
descriptors.add(SAMPLER_INTERVAL);
|
||||
descriptors.add(DOC_ID);
|
||||
descriptors.add(INDEX);
|
||||
descriptors.add(TYPE);
|
||||
descriptors.add(CHARSET);
|
||||
|
||||
propertyDescriptors = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return relationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return propertyDescriptors;
|
||||
}
|
||||
|
||||
@Override
|
||||
@OnScheduled
|
||||
public void setup(ProcessContext context) {
|
||||
super.setup(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
|
||||
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
|
||||
|
||||
final ComponentLog logger = getLogger();
|
||||
try {
|
||||
|
||||
logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
|
||||
final long startNanos = System.nanoTime();
|
||||
|
||||
GetRequestBuilder getRequestBuilder = esClient.get().prepareGet(index, docType, docId);
|
||||
if (authToken != null) {
|
||||
getRequestBuilder.putHeader("Authorization", authToken);
|
||||
}
|
||||
final GetResponse getResponse = getRequestBuilder.execute().actionGet();
|
||||
|
||||
if (getResponse == null || !getResponse.isExists()) {
|
||||
logger.debug("Failed to read {}/{}/{} from Elasticsearch: Document not found",
|
||||
new Object[]{index, docType, docId});
|
||||
|
||||
// We couldn't find the document, so penalize it and send it to "not found"
|
||||
flowFile = session.penalize(flowFile);
|
||||
session.transfer(flowFile, REL_NOT_FOUND);
|
||||
} else {
|
||||
flowFile = session.putAttribute(flowFile, "filename", docId);
|
||||
flowFile = session.putAttribute(flowFile, "es.index", index);
|
||||
flowFile = session.putAttribute(flowFile, "es.type", docType);
|
||||
flowFile = session.write(flowFile, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(OutputStream out) throws IOException {
|
||||
out.write(getResponse.getSourceAsString().getBytes(charset));
|
||||
}
|
||||
});
|
||||
logger.debug("Elasticsearch document " + docId + " fetched, routing to success");
|
||||
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||
final String uri = context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + index + "/" + docType + "/" + docId;
|
||||
session.getProvenanceReporter().fetch(flowFile, uri, millis);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
}
|
||||
} catch (NoNodeAvailableException
|
||||
| ElasticsearchTimeoutException
|
||||
| ReceiveTimeoutTransportException
|
||||
| NodeClosedException exceptionToRetry) {
|
||||
logger.error("Failed to read into Elasticsearch due to {}, this may indicate an error in configuration "
|
||||
+ "(hosts, username/password, etc.). Routing to retry",
|
||||
new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
|
||||
session.transfer(flowFile, REL_RETRY);
|
||||
context.yield();
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, e);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
context.yield();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispose of ElasticSearch client
|
||||
*/
|
||||
@Override
|
||||
@OnStopped
|
||||
public void closeClient() {
|
||||
super.closeClient();
|
||||
}
|
||||
}
|
|
@ -1,284 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.elasticsearch;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.behavior.SystemResource;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.expression.AttributeExpression;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
@EventDriven
|
||||
@SupportsBatching
|
||||
@Tags({"elasticsearch", "insert", "update", "write", "put"})
|
||||
@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as "
|
||||
+ "the index to insert into and the type of the document. If the cluster has been configured for authorization "
|
||||
+ "and/or secure transport (SSL/TLS) and the Shield plugin is available, secure connections can be made. This processor "
|
||||
+ "supports Elasticsearch 2.x clusters.")
|
||||
@SystemResourceConsideration(resource = SystemResource.MEMORY)
|
||||
public class PutElasticsearch extends AbstractElasticsearchTransportClientProcessor {
|
||||
|
||||
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
||||
.description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build();
|
||||
|
||||
static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
||||
.description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build();
|
||||
|
||||
static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
|
||||
.description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder()
|
||||
.name("Identifier Attribute")
|
||||
.description("The name of the attribute containing the identifier for each FlowFile")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
|
||||
.name("Index")
|
||||
.description("The name of the index to insert into")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
|
||||
AttributeExpression.ResultType.STRING, true))
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
|
||||
.name("Type")
|
||||
.description("The type of this document (used by Elasticsearch for indexing and searching)")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
|
||||
.name("Index Operation")
|
||||
.description("The type of the operation used to index (index, update, upsert)")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
|
||||
.defaultValue("index")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
|
||||
.name("Batch Size")
|
||||
.description("The preferred number of FlowFiles to put to the database in a single transaction")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.defaultValue("100")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
private static final Set<Relationship> relationships;
|
||||
private static final List<PropertyDescriptor> propertyDescriptors;
|
||||
|
||||
static {
|
||||
final Set<Relationship> _rels = new HashSet<>();
|
||||
_rels.add(REL_SUCCESS);
|
||||
_rels.add(REL_FAILURE);
|
||||
_rels.add(REL_RETRY);
|
||||
relationships = Collections.unmodifiableSet(_rels);
|
||||
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>();
|
||||
descriptors.add(CLUSTER_NAME);
|
||||
descriptors.add(HOSTS);
|
||||
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
|
||||
descriptors.add(PROP_SHIELD_LOCATION);
|
||||
descriptors.add(USERNAME);
|
||||
descriptors.add(PASSWORD);
|
||||
descriptors.add(PING_TIMEOUT);
|
||||
descriptors.add(SAMPLER_INTERVAL);
|
||||
descriptors.add(ID_ATTRIBUTE);
|
||||
descriptors.add(INDEX);
|
||||
descriptors.add(TYPE);
|
||||
descriptors.add(CHARSET);
|
||||
descriptors.add(BATCH_SIZE);
|
||||
descriptors.add(INDEX_OP);
|
||||
|
||||
propertyDescriptors = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return relationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return propertyDescriptors;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void setup(ProcessContext context) {
|
||||
super.setup(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
|
||||
final ComponentLog logger = getLogger();
|
||||
final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue();
|
||||
final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
|
||||
|
||||
final List<FlowFile> flowFiles = session.get(batchSize);
|
||||
if (flowFiles.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Keep track of the list of flow files that need to be transferred. As they are transferred, remove them from the list.
|
||||
List<FlowFile> flowFilesToTransfer = new LinkedList<>(flowFiles);
|
||||
try {
|
||||
final BulkRequestBuilder bulk = esClient.get().prepareBulk();
|
||||
if (authToken != null) {
|
||||
bulk.putHeader("Authorization", authToken);
|
||||
}
|
||||
|
||||
for (FlowFile file : flowFiles) {
|
||||
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();
|
||||
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue();
|
||||
final String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(file).getValue();
|
||||
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(file).getValue());
|
||||
|
||||
final String id = file.getAttribute(id_attribute);
|
||||
if (id == null) {
|
||||
logger.error("No value in identifier attribute {} for {}, transferring to failure", new Object[]{id_attribute, file});
|
||||
flowFilesToTransfer.remove(file);
|
||||
session.transfer(file, REL_FAILURE);
|
||||
} else {
|
||||
session.read(file, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(final InputStream in) throws IOException {
|
||||
String json = IOUtils.toString(in, charset)
|
||||
.replace("\r\n", " ").replace('\n', ' ').replace('\r', ' ');
|
||||
|
||||
if (indexOp.equalsIgnoreCase("index")) {
|
||||
bulk.add(esClient.get().prepareIndex(index, docType, id)
|
||||
.setSource(json.getBytes(charset)));
|
||||
} else if (indexOp.equalsIgnoreCase("upsert")) {
|
||||
bulk.add(esClient.get().prepareUpdate(index, docType, id)
|
||||
.setDoc(json.getBytes(charset))
|
||||
.setDocAsUpsert(true));
|
||||
} else if (indexOp.equalsIgnoreCase("update")) {
|
||||
bulk.add(esClient.get().prepareUpdate(index, docType, id)
|
||||
.setDoc(json.getBytes(charset)));
|
||||
} else {
|
||||
throw new IOException("Index operation: " + indexOp + " not supported.");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
final BulkResponse response = bulk.execute().actionGet();
|
||||
if (response.hasFailures()) {
|
||||
// Responses are guaranteed to be in order, remove them in reverse order
|
||||
BulkItemResponse[] responses = response.getItems();
|
||||
if (responses != null && responses.length > 0) {
|
||||
for (int i = responses.length - 1; i >= 0; i--) {
|
||||
final FlowFile flowFile = flowFilesToTransfer.get(i);
|
||||
if (responses[i].isFailed()) {
|
||||
logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure",
|
||||
new Object[]{flowFile, responses[i].getFailure().getMessage()});
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
|
||||
} else {
|
||||
session.getProvenanceReporter().send(flowFile, context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" + responses[i].getIndex());
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
}
|
||||
flowFilesToTransfer.remove(flowFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Transfer any remaining flowfiles to success
|
||||
flowFilesToTransfer.forEach(file -> {
|
||||
session.transfer(file, REL_SUCCESS);
|
||||
// Record provenance event
|
||||
session.getProvenanceReporter().send(file, context.getProperty(HOSTS).evaluateAttributeExpressions().getValue() + "/" +
|
||||
context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue());
|
||||
});
|
||||
|
||||
} catch (NoNodeAvailableException
|
||||
| ElasticsearchTimeoutException
|
||||
| ReceiveTimeoutTransportException
|
||||
| NodeClosedException exceptionToRetry) {
|
||||
|
||||
// Authorization errors and other problems are often returned as NoNodeAvailableExceptions without a
|
||||
// traceable cause. However the cause seems to be logged, just not available to this caught exception.
|
||||
// Since the error message will show up as a bulletin, we make specific mention to check the logs for
|
||||
// more details.
|
||||
logger.error("Failed to insert into Elasticsearch due to {}. More detailed information may be available in " +
|
||||
"the NiFi logs.",
|
||||
new Object[]{exceptionToRetry.getLocalizedMessage()}, exceptionToRetry);
|
||||
session.transfer(flowFilesToTransfer, REL_RETRY);
|
||||
context.yield();
|
||||
|
||||
} catch (Exception exceptionToFail) {
|
||||
logger.error("Failed to insert into Elasticsearch due to {}, transferring to failure",
|
||||
new Object[]{exceptionToFail.getLocalizedMessage()}, exceptionToFail);
|
||||
|
||||
session.transfer(flowFilesToTransfer, REL_FAILURE);
|
||||
context.yield();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispose of ElasticSearch client
|
||||
*/
|
||||
@OnStopped
|
||||
public void closeClient() {
|
||||
super.closeClient();
|
||||
}
|
||||
}
|
|
@ -12,8 +12,6 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.processors.elasticsearch.FetchElasticsearch
|
||||
org.apache.nifi.processors.elasticsearch.PutElasticsearch
|
||||
org.apache.nifi.processors.elasticsearch.FetchElasticsearchHttp
|
||||
org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp
|
||||
org.apache.nifi.processors.elasticsearch.PutElasticsearchHttpRecord
|
||||
|
|
|
@ -1,92 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.elasticsearch;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ITQueryElasticsearchHttp {
|
||||
|
||||
private TestRunner runner;
|
||||
|
||||
@After
|
||||
public void teardown() {
|
||||
runner = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchElasticsearchOnTrigger() throws IOException {
|
||||
runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
|
||||
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
|
||||
"http://localhost.internal:9200");
|
||||
|
||||
runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(QueryElasticsearchHttp.TYPE, "provenance");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(QueryElasticsearchHttp.QUERY,
|
||||
"identifier:2f79eba8839f5976cd0b1e16a0e7fe8d7dd0ceca");
|
||||
runner.setProperty(QueryElasticsearchHttp.SORT, "timestamp:asc");
|
||||
runner.setProperty(QueryElasticsearchHttp.FIELDS, "transit_uri,version");
|
||||
runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "1");
|
||||
runner.assertValid();
|
||||
|
||||
runner.setIncomingConnection(false);
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, 3);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(
|
||||
QueryElasticsearchHttp.REL_SUCCESS).get(0);
|
||||
assertNotNull(out);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchElasticsearchOnTrigger_IncomingFile() throws IOException {
|
||||
runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
|
||||
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
|
||||
"http://localhost.internal:9200");
|
||||
|
||||
runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(QueryElasticsearchHttp.TYPE, "provenance");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(QueryElasticsearchHttp.QUERY, "${query}");
|
||||
runner.setProperty(QueryElasticsearchHttp.SORT, "timestamp:asc");
|
||||
runner.setProperty(QueryElasticsearchHttp.FIELDS, "transit_uri,version");
|
||||
runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "1");
|
||||
runner.assertValid();
|
||||
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("query", "identifier:2f79eba8839f5976cd0b1e16a0e7fe8d7dd0ceca");
|
||||
runner.enqueue("".getBytes(), attributes);
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, 3);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(
|
||||
QueryElasticsearchHttp.REL_SUCCESS).get(0);
|
||||
assertNotNull(out);
|
||||
}
|
||||
}
|
|
@ -1,63 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.elasticsearch;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ITScrollElasticsearchHttp {
|
||||
|
||||
private TestRunner runner;
|
||||
|
||||
@After
|
||||
public void teardown() {
|
||||
runner = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchElasticsearchOnTrigger() throws IOException {
|
||||
runner = TestRunners.newTestRunner(ScrollElasticsearchHttp.class); // all docs are found
|
||||
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
|
||||
"http://ip-172-31-49-152.ec2.internal:9200");
|
||||
|
||||
runner.setProperty(ScrollElasticsearchHttp.INDEX, "prod-accounting");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(ScrollElasticsearchHttp.TYPE, "provenance");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(ScrollElasticsearchHttp.QUERY,
|
||||
"identifier:2f79eba8839f5976cd0b1e16a0e7fe8d7dd0ceca");
|
||||
runner.setProperty(ScrollElasticsearchHttp.SORT, "timestamp:asc");
|
||||
runner.setProperty(ScrollElasticsearchHttp.FIELDS, "transit_uri,version");
|
||||
runner.setProperty(ScrollElasticsearchHttp.PAGE_SIZE, "1");
|
||||
runner.assertValid();
|
||||
|
||||
runner.setIncomingConnection(false);
|
||||
runner.run(4, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ScrollElasticsearchHttp.REL_SUCCESS, 3);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(
|
||||
ScrollElasticsearchHttp.REL_SUCCESS).get(0);
|
||||
assertNotNull(out);
|
||||
}
|
||||
}
|
|
@ -1,276 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.elasticsearch;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.serialization.record.MockRecordParser;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class PutElasticsearchHttpRecordIT {
|
||||
protected TestRunner runner;
|
||||
private MockRecordParser recordReader;
|
||||
static RecordSchema personSchema;
|
||||
static TestRunner FETCH_RUNNER;
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
@BeforeClass
|
||||
public static void setupTests() throws Exception {
|
||||
final List<RecordField> personFields = new ArrayList<>();
|
||||
final RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType());
|
||||
final RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType());
|
||||
final RecordField sportField = new RecordField("sport", RecordFieldType.STRING.getDataType());
|
||||
personFields.add(nameField);
|
||||
personFields.add(ageField);
|
||||
personFields.add(sportField);
|
||||
personSchema = new SimpleRecordSchema(personFields);
|
||||
|
||||
FETCH_RUNNER = TestRunners.newTestRunner(FetchElasticsearchHttp.class);
|
||||
FETCH_RUNNER.setProperty(FetchElasticsearchHttp.ES_URL, "http://localhost:9200");
|
||||
FETCH_RUNNER.setProperty(FetchElasticsearchHttp.INDEX, "people_test");
|
||||
FETCH_RUNNER.setProperty(FetchElasticsearchHttp.TYPE, "person");
|
||||
FETCH_RUNNER.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
|
||||
FETCH_RUNNER.assertValid();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
recordReader = new MockRecordParser();
|
||||
recordReader.addSchemaField("id", RecordFieldType.INT);
|
||||
|
||||
recordReader.addSchemaField("person", RecordFieldType.RECORD);
|
||||
|
||||
runner = TestRunners.newTestRunner(PutElasticsearchHttpRecord.class);
|
||||
runner.addControllerService("reader", recordReader);
|
||||
runner.enableControllerService(recordReader);
|
||||
runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "reader");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.ES_URL, "http://localhost:9200");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
FETCH_RUNNER.clearTransferState();
|
||||
}
|
||||
|
||||
private void setupPut() {
|
||||
runner.enqueue("");
|
||||
runner.run(1, true, true);
|
||||
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 0);
|
||||
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_RETRY, 0);
|
||||
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
private void testFetch(List<Map<String, String>> attrs) {
|
||||
for (Map<String, String> attr : attrs) {
|
||||
FETCH_RUNNER.enqueue("", attr);
|
||||
}
|
||||
|
||||
FETCH_RUNNER.run(attrs.size(), true, true);
|
||||
FETCH_RUNNER.assertTransferCount(FetchElasticsearchHttp.REL_FAILURE, 0);
|
||||
FETCH_RUNNER.assertTransferCount(FetchElasticsearchHttp.REL_RETRY, 0);
|
||||
FETCH_RUNNER.assertTransferCount(FetchElasticsearchHttp.REL_NOT_FOUND, 0);
|
||||
FETCH_RUNNER.assertTransferCount(FetchElasticsearchHttp.REL_SUCCESS, attrs.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoNullSuppresion() throws Exception {
|
||||
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
|
||||
put("name", "John Doe");
|
||||
put("age", 48);
|
||||
put("sport", null);
|
||||
}}));
|
||||
|
||||
List<Map<String, String>> attrs = new ArrayList<>();
|
||||
Map<String, String> attr = new HashMap<>();
|
||||
attr.put("doc_id", "1");
|
||||
attrs.add(attr);
|
||||
|
||||
setupPut();
|
||||
testFetch(attrs);
|
||||
|
||||
byte[] raw = FETCH_RUNNER.getContentAsByteArray(FETCH_RUNNER.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0));
|
||||
String val = new String(raw);
|
||||
|
||||
Map<String, Object> parsed = mapper.readValue(val, Map.class);
|
||||
Assert.assertNotNull(parsed);
|
||||
Map<String, Object> person = (Map)parsed.get("person");
|
||||
Assert.assertNotNull(person);
|
||||
Assert.assertTrue(person.containsKey("sport"));
|
||||
Assert.assertNull(person.get("sport"));
|
||||
}
|
||||
|
||||
private void sharedSuppressTest(SharedPostTest spt) throws Exception {
|
||||
List<Map<String, String>> attrs = new ArrayList<>();
|
||||
Map<String, String> attr = new HashMap<>();
|
||||
attr.put("doc_id", "1");
|
||||
attrs.add(attr);
|
||||
attr = new HashMap<>();
|
||||
attr.put("doc_id", "2");
|
||||
attrs.add(attr);
|
||||
|
||||
setupPut();
|
||||
testFetch(attrs);
|
||||
|
||||
List<MockFlowFile> flowFiles = FETCH_RUNNER.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS);
|
||||
String ff1 = new String(FETCH_RUNNER.getContentAsByteArray(flowFiles.get(0)));
|
||||
String ff2 = new String(FETCH_RUNNER.getContentAsByteArray(flowFiles.get(1)));
|
||||
Map<String, Object> ff1Map = mapper.readValue(ff1, Map.class);
|
||||
Map<String, Object> ff2Map = mapper.readValue(ff2, Map.class);
|
||||
Assert.assertNotNull(ff1Map);
|
||||
Assert.assertNotNull(ff2Map);
|
||||
Map<String, Object> p1 = (Map)ff1Map.get("person");
|
||||
Map<String, Object> p2 = (Map)ff2Map.get("person");
|
||||
Assert.assertNotNull(p1);
|
||||
Assert.assertNotNull(p2);
|
||||
|
||||
spt.run(p1, p2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissingRecord() throws Exception {
|
||||
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
|
||||
put("name", "John Doe");
|
||||
put("age", 48);
|
||||
}}));
|
||||
|
||||
recordReader.addRecord(2, new MapRecord(personSchema, new HashMap<String,Object>() {{
|
||||
put("name", "John Doe");
|
||||
put("age", 48);
|
||||
put("sport", null);
|
||||
}}));
|
||||
|
||||
runner.setProperty(PutElasticsearchHttpRecord.SUPPRESS_NULLS, PutElasticsearchHttpRecord.SUPPRESS_MISSING);
|
||||
|
||||
sharedSuppressTest((p1, p2) -> {
|
||||
Assert.assertFalse(p1.containsKey("sport"));
|
||||
Assert.assertTrue(p2.containsKey("sport"));
|
||||
Assert.assertNull(p2.get("sport"));
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAlwaysSuppress() throws Exception {
|
||||
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
|
||||
put("name", "John Doe");
|
||||
put("age", 48);
|
||||
}}));
|
||||
|
||||
recordReader.addRecord(2, new MapRecord(personSchema, new HashMap<String,Object>() {{
|
||||
put("name", "John Doe");
|
||||
put("age", 48);
|
||||
put("sport", null);
|
||||
}}));
|
||||
|
||||
runner.setProperty(PutElasticsearchHttpRecord.SUPPRESS_NULLS, PutElasticsearchHttpRecord.ALWAYS_SUPPRESS);
|
||||
|
||||
sharedSuppressTest((p1, p2) -> {
|
||||
Assert.assertFalse(p1.containsKey("sport"));
|
||||
Assert.assertFalse(p2.containsKey("sport"));
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIllegalIndexName() throws Exception {
|
||||
// Undo some stuff from setup()
|
||||
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people\"test");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
|
||||
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
|
||||
put("name", "John Doe");
|
||||
put("age", 48);
|
||||
put("sport", null);
|
||||
}}));
|
||||
|
||||
List<Map<String, String>> attrs = new ArrayList<>();
|
||||
Map<String, String> attr = new HashMap<>();
|
||||
attr.put("doc_id", "1");
|
||||
attrs.add(attr);
|
||||
|
||||
runner.enqueue("");
|
||||
runner.run(1, true, true);
|
||||
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 1);
|
||||
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_RETRY, 0);
|
||||
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIndexNameWithJsonChar() throws Exception {
|
||||
// Undo some stuff from setup()
|
||||
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people}test");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
|
||||
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
|
||||
put("name", "John Doe");
|
||||
put("age", 48);
|
||||
put("sport", null);
|
||||
}}));
|
||||
|
||||
List<Map<String, String>> attrs = new ArrayList<>();
|
||||
Map<String, String> attr = new HashMap<>();
|
||||
attr.put("doc_id", "1");
|
||||
attrs.add(attr);
|
||||
|
||||
runner.enqueue("");
|
||||
runner.run(1, true, true);
|
||||
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_FAILURE, 0);
|
||||
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_RETRY, 0);
|
||||
runner.assertTransferCount(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTypeNameWithSpecialChars() throws Exception {
|
||||
// Undo some stuff from setup()
|
||||
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test2");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "per\"son");
|
||||
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
|
||||
put("name", "John Doe");
|
||||
put("age", 48);
|
||||
put("sport", null);
|
||||
}}));
|
||||
|
||||
List<Map<String, String>> attrs = new ArrayList<>();
|
||||
Map<String, String> attr = new HashMap<>();
|
||||
attr.put("doc_id", "1");
|
||||
attrs.add(attr);
|
||||
|
||||
setupPut();
|
||||
}
|
||||
|
||||
private interface SharedPostTest {
|
||||
void run(Map<String, Object> p1, Map<String, Object> p2);
|
||||
}
|
||||
}
|
|
@ -1,446 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.elasticsearch;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.MalformedURLException;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.MockProcessContext;
|
||||
import org.apache.nifi.util.MockProcessorInitializationContext;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ListenableActionFuture;
|
||||
import org.elasticsearch.action.get.GetAction;
|
||||
import org.elasticsearch.action.get.GetRequestBuilder;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.support.AdapterActionFuture;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestFetchElasticsearch {
|
||||
|
||||
private InputStream docExample;
|
||||
private TestRunner runner;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
|
||||
docExample = classloader.getResourceAsStream("DocumentExample.json");
|
||||
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() {
|
||||
runner = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchElasticsearchOnTrigger() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(true)); // all docs are found
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
|
||||
|
||||
runner.setProperty(FetchElasticsearch.INDEX, "doc");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(FetchElasticsearch.TYPE, "status");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 1);
|
||||
assertFalse(runner.getProvenanceEvents().isEmpty());
|
||||
runner.getProvenanceEvents().forEach(event -> assertEquals(event.getEventType(), ProvenanceEventType.FETCH) );
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch.REL_SUCCESS).get(0);
|
||||
assertNotNull(out);
|
||||
out.assertAttributeEquals("doc_id", "28039652140");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchElasticsearchOnTriggerEL() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(true)); // all docs are found
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "${cluster.name}");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "${hosts}");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "${ping.timeout}");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "${sampler.interval}");
|
||||
|
||||
runner.setProperty(FetchElasticsearch.INDEX, "doc");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(FetchElasticsearch.TYPE, "status");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
|
||||
runner.assertValid();
|
||||
runner.setVariable("cluster.name", "elasticsearch");
|
||||
runner.setVariable("hosts", "127.0.0.1:9300");
|
||||
runner.setVariable("ping.timeout", "5s");
|
||||
runner.setVariable("sampler.interval", "5s");
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch.REL_SUCCESS).get(0);
|
||||
assertNotNull(out);
|
||||
out.assertAttributeEquals("doc_id", "28039652140");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchElasticsearchOnTriggerWithFailures() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(false)); // simulate doc not found
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(FetchElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(FetchElasticsearch.TYPE, "status");
|
||||
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
// This test generates a "document not found"
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_NOT_FOUND, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch.REL_NOT_FOUND).get(0);
|
||||
assertNotNull(out);
|
||||
out.assertAttributeEquals("doc_id", "28039652140");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchElasticsearchWithBadHosts() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(false)); // simulate doc not found
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "http://127.0.0.1:9300,127.0.0.2:9300");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(FetchElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(FetchElasticsearch.TYPE, "status");
|
||||
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
|
||||
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchElasticsearchOnTriggerWithExceptions() throws IOException {
|
||||
FetchElasticsearchTestProcessor processor = new FetchElasticsearchTestProcessor(true);
|
||||
runner = TestRunners.newTestRunner(processor);
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(FetchElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(FetchElasticsearch.TYPE, "status");
|
||||
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
|
||||
|
||||
// No Node Available exception
|
||||
processor.setExceptionToThrow(new NoNodeAvailableException("test"));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Elasticsearch Timeout exception
|
||||
processor.setExceptionToThrow(new ElasticsearchTimeoutException("test"));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652141");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Receive Timeout Transport exception
|
||||
processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class)));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652141");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Node Closed exception
|
||||
processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class)));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652141");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Elasticsearch Parse exception
|
||||
processor.setExceptionToThrow(new ElasticsearchParseException("test"));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652141");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
// This test generates an exception on execute(),routes to failure
|
||||
runner.assertTransferCount(FetchElasticsearch.REL_FAILURE, 1);
|
||||
}
|
||||
|
||||
@Test(expected = ProcessException.class)
|
||||
public void testCreateElasticsearchClientWithException() throws ProcessException {
|
||||
FetchElasticsearchTestProcessor processor = new FetchElasticsearchTestProcessor(true) {
|
||||
@Override
|
||||
protected TransportClient getTransportClient(Settings.Builder settingsBuilder, String shieldUrl,
|
||||
String username, String password)
|
||||
throws MalformedURLException {
|
||||
throw new MalformedURLException();
|
||||
}
|
||||
};
|
||||
|
||||
MockProcessContext context = new MockProcessContext(processor);
|
||||
processor.initialize(new MockProcessorInitializationContext(processor, context));
|
||||
processor.callCreateElasticsearchClient(context);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetupSecureClient() throws Exception {
|
||||
FetchElasticsearchTestProcessor processor = new FetchElasticsearchTestProcessor(true);
|
||||
runner = TestRunners.newTestRunner(processor);
|
||||
SSLContextService sslService = mock(SSLContextService.class);
|
||||
when(sslService.getIdentifier()).thenReturn("ssl-context");
|
||||
runner.addControllerService("ssl-context", sslService);
|
||||
runner.enableControllerService(sslService);
|
||||
runner.setProperty(FetchElasticsearch.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(FetchElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(FetchElasticsearch.TYPE, "status");
|
||||
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
|
||||
|
||||
// Allow time for the controller service to fully initialize
|
||||
Thread.sleep(500);
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A Test class that extends the processor in order to inject/mock behavior
|
||||
*/
|
||||
private static class FetchElasticsearchTestProcessor extends FetchElasticsearch {
|
||||
boolean documentExists = true;
|
||||
Exception exceptionToThrow = null;
|
||||
|
||||
public FetchElasticsearchTestProcessor(boolean documentExists) {
|
||||
this.documentExists = documentExists;
|
||||
}
|
||||
|
||||
public void setExceptionToThrow(Exception exceptionToThrow) {
|
||||
this.exceptionToThrow = exceptionToThrow;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TransportClient getTransportClient(Settings.Builder settingsBuilder, String shieldUrl,
|
||||
String username, String password)
|
||||
throws MalformedURLException {
|
||||
TransportClient mockClient = mock(TransportClient.class);
|
||||
GetRequestBuilder getRequestBuilder = spy(new GetRequestBuilder(mockClient, GetAction.INSTANCE));
|
||||
if (exceptionToThrow != null) {
|
||||
doThrow(exceptionToThrow).when(getRequestBuilder).execute();
|
||||
} else {
|
||||
doReturn(new MockGetRequestBuilderExecutor(documentExists)).when(getRequestBuilder).execute();
|
||||
}
|
||||
when(mockClient.prepareGet(anyString(), anyString(), anyString())).thenReturn(getRequestBuilder);
|
||||
|
||||
return mockClient;
|
||||
}
|
||||
|
||||
public void callCreateElasticsearchClient(ProcessContext context) {
|
||||
createElasticsearchClient(context);
|
||||
}
|
||||
|
||||
private static class MockGetRequestBuilderExecutor
|
||||
extends AdapterActionFuture<GetResponse, ActionListener<GetResponse>>
|
||||
implements ListenableActionFuture<GetResponse> {
|
||||
|
||||
boolean documentExists = true;
|
||||
|
||||
public MockGetRequestBuilderExecutor(boolean documentExists) {
|
||||
this.documentExists = documentExists;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected GetResponse convert(ActionListener<GetResponse> bulkResponseActionListener) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(ActionListener<GetResponse> actionListener) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetResponse get() throws InterruptedException, ExecutionException {
|
||||
GetResponse response = mock(GetResponse.class);
|
||||
when(response.isExists()).thenReturn(documentExists);
|
||||
when(response.getSourceAsBytes()).thenReturn("Success".getBytes());
|
||||
when(response.getSourceAsString()).thenReturn("Success");
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetResponse actionGet() {
|
||||
try {
|
||||
return get();
|
||||
} catch (Exception e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Integration test section below
|
||||
//
|
||||
// The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution.
|
||||
// However if you wish to execute them as part of a test phase, comment out the @Ignored line for each
|
||||
// desired test.
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Tests basic ES functionality against a local or test ES cluster
|
||||
*/
|
||||
@Test
|
||||
@Ignore("Comment this out if you want to run against local or test ES")
|
||||
public void testFetchElasticsearchBasic() {
|
||||
System.out.println("Starting test " + new Object() {
|
||||
}.getClass().getEnclosingMethod().getName());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearch());
|
||||
|
||||
//Local Cluster - Mac pulled from brew
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
|
||||
|
||||
runner.setProperty(FetchElasticsearch.INDEX, "doc");
|
||||
|
||||
runner.setProperty(FetchElasticsearch.TYPE, "status");
|
||||
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
|
||||
|
||||
runner.enqueue(docExample);
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Comment this out if you want to run against local or test ES")
|
||||
public void testFetchElasticsearchBatch() throws IOException {
|
||||
System.out.println("Starting test " + new Object() {
|
||||
}.getClass().getEnclosingMethod().getName());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearch());
|
||||
|
||||
//Local Cluster - Mac pulled from brew
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(FetchElasticsearch.INDEX, "doc");
|
||||
|
||||
runner.setProperty(FetchElasticsearch.TYPE, "status");
|
||||
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
|
||||
runner.assertValid();
|
||||
|
||||
|
||||
String message = convertStreamToString(docExample);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
|
||||
long newId = 28039652140L + i;
|
||||
final String newStrId = Long.toString(newId);
|
||||
runner.enqueue(message.getBytes(), new HashMap<String, String>() {{
|
||||
put("doc_id", newStrId);
|
||||
}});
|
||||
|
||||
}
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 100);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert an input stream to a stream
|
||||
*
|
||||
* @param is input the input stream
|
||||
* @return return the converted input stream as a string
|
||||
*/
|
||||
static String convertStreamToString(InputStream is) {
|
||||
java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
|
||||
return s.hasNext() ? s.next() : "";
|
||||
}
|
||||
}
|
|
@ -26,10 +26,8 @@ import static org.mockito.Mockito.when;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
|
@ -38,7 +36,6 @@ import org.apache.nifi.util.TestRunner;
|
|||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
@ -424,100 +421,4 @@ public class TestFetchElasticsearchHttp {
|
|||
return client;
|
||||
}
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Integration test section below
|
||||
//
|
||||
// The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution.
|
||||
// However if you wish to execute them as part of a test phase, comment out the @Ignored line for each
|
||||
// desired test.
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Tests basic ES functionality against a local or test ES cluster
|
||||
*/
|
||||
@Test
|
||||
@Ignore("Comment this out if you want to run against local or test ES")
|
||||
public void testFetchElasticsearchBasic() throws IOException {
|
||||
System.out.println("Starting test " + new Object() {
|
||||
}.getClass().getEnclosingMethod().getName());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp());
|
||||
|
||||
// add data to ES instance
|
||||
new OkHttpClient.Builder().build().newCall(
|
||||
new Request.Builder().url("http://127.0.0.1:9200/doc/_doc/28039652140")
|
||||
.addHeader("Content-Type", "application/json")
|
||||
.put(
|
||||
RequestBody.create(MediaType.get("application/json"),
|
||||
IOUtils.toString(docExample, StandardCharsets.UTF_8))
|
||||
).build()
|
||||
).execute();
|
||||
|
||||
//Local Cluster - Mac pulled from brew
|
||||
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
|
||||
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
|
||||
runner.removeProperty(FetchElasticsearchHttp.TYPE);
|
||||
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
|
||||
runner.enqueue(docExample);
|
||||
runner.run(1, true, true);
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Un-authenticated proxy : Comment this out if you want to run against local proxied ES.")
|
||||
public void testFetchElasticsearchBasicBehindProxy() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp());
|
||||
runner.setValidateExpressionUsage(true);
|
||||
|
||||
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
|
||||
runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
|
||||
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
|
||||
|
||||
runner.setProperty(FetchElasticsearchHttp.PROXY_HOST, "localhost");
|
||||
runner.setProperty(FetchElasticsearchHttp.PROXY_PORT, "3228");
|
||||
runner.setProperty(FetchElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
|
||||
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
|
||||
runner.enqueue(docExample);
|
||||
runner.run(1, true, true);
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Authenticated Proxy : Comment this out if you want to run against local proxied ES.")
|
||||
public void testFetchElasticsearchBasicBehindAuthenticatedProxy() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp());
|
||||
runner.setValidateExpressionUsage(true);
|
||||
|
||||
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
|
||||
runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
|
||||
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
|
||||
|
||||
runner.setProperty(FetchElasticsearchHttp.PROXY_HOST, "localhost");
|
||||
runner.setProperty(FetchElasticsearchHttp.PROXY_PORT, "3328");
|
||||
runner.setProperty(FetchElasticsearchHttp.PROXY_USERNAME, "squid");
|
||||
runner.setProperty(FetchElasticsearchHttp.PROXY_PASSWORD, "changeme");
|
||||
runner.setProperty(FetchElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
|
||||
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
|
||||
runner.enqueue(docExample);
|
||||
runner.run(1, true, true);
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,491 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.elasticsearch;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ListenableActionFuture;
|
||||
import org.elasticsearch.action.bulk.BulkAction;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexAction;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.support.AdapterActionFuture;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class TestPutElasticsearch {
|
||||
|
||||
private InputStream docExample;
|
||||
private TestRunner runner;
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
|
||||
docExample = classloader.getResourceAsStream("DocumentExample.json");
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() {
|
||||
runner = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutElasticSearchOnTrigger() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
|
||||
|
||||
runner.setProperty(PutElasticsearch.INDEX, "doc");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(PutElasticsearch.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
|
||||
assertFalse(runner.getProvenanceEvents().isEmpty());
|
||||
runner.getProvenanceEvents().forEach(event -> assertEquals(event.getEventType(), ProvenanceEventType.SEND));
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
|
||||
assertNotNull(out);
|
||||
out.assertAttributeEquals("doc_id", "28039652140");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutElasticSearchOnTriggerEL() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "${cluster.name}");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "${hosts}");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "${ping.timeout}");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "${sampler.interval}");
|
||||
|
||||
runner.setProperty(PutElasticsearch.INDEX, "doc");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(PutElasticsearch.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
|
||||
runner.assertValid();
|
||||
runner.setVariable("cluster.name", "elasticsearch");
|
||||
runner.setVariable("hosts", "127.0.0.1:9300");
|
||||
runner.setVariable("ping.timeout", "5s");
|
||||
runner.setVariable("sampler.interval", "5s");
|
||||
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
|
||||
assertNotNull(out);
|
||||
out.assertAttributeEquals("doc_id", "28039652140");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutElasticSearchOnTriggerWithFailures() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(PutElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearch.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearch.BATCH_SIZE, "2");
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652141");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1);
|
||||
runner.assertTransferCount(PutElasticsearch.REL_SUCCESS, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0);
|
||||
assertNotNull(out);
|
||||
out.assertAttributeEquals("doc_id", "28039652140");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutElasticsearchOnTriggerWithExceptions() throws IOException {
|
||||
PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(false);
|
||||
runner = TestRunners.newTestRunner(processor);
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(PutElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearch.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
|
||||
|
||||
// No Node Available exception
|
||||
processor.setExceptionToThrow(new NoNodeAvailableException("test"));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Elasticsearch Timeout exception
|
||||
processor.setExceptionToThrow(new ElasticsearchTimeoutException("test"));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652141");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Receive Timeout Transport exception
|
||||
processor.setExceptionToThrow(new ReceiveTimeoutTransportException(mock(StreamInput.class)));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652142");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Node Closed exception
|
||||
processor.setExceptionToThrow(new NodeClosedException(mock(StreamInput.class)));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652143");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_RETRY, 1);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Elasticsearch Parse exception
|
||||
processor.setExceptionToThrow(new ElasticsearchParseException("test"));
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652144");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
// This test generates an exception on execute(),routes to failure
|
||||
runner.assertTransferCount(PutElasticsearch.REL_FAILURE, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(PutElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearch.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
|
||||
|
||||
runner.enqueue(docExample);
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0);
|
||||
assertNotNull(out);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutElasticsearchOnTriggerWithIndexFromAttribute() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false));
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(PutElasticsearch.INDEX, "${i}");
|
||||
runner.setProperty(PutElasticsearch.TYPE, "${type}");
|
||||
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652144");
|
||||
put("i", "doc");
|
||||
put("type", "status");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
|
||||
assertNotNull(out);
|
||||
runner.clearTransferState();
|
||||
|
||||
// Now try an empty attribute value, should fail
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652144");
|
||||
put("type", "status");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_RETRY, 1);
|
||||
final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearch.REL_RETRY).get(0);
|
||||
assertNotNull(out2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
|
||||
|
||||
runner.setProperty(PutElasticsearch.INDEX, "doc");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(PutElasticsearch.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
|
||||
runner.assertValid();
|
||||
|
||||
runner.setProperty(PutElasticsearch.INDEX_OP, "index_fail");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_FAILURE, 1);
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_FAILURE).get(0);
|
||||
assertNotNull(out);
|
||||
}
|
||||
|
||||
/**
|
||||
* A Test class that extends the processor in order to inject/mock behavior
|
||||
*/
|
||||
private static class PutElasticsearchTestProcessor extends PutElasticsearch {
|
||||
boolean responseHasFailures = false;
|
||||
Exception exceptionToThrow = null;
|
||||
|
||||
public PutElasticsearchTestProcessor(boolean responseHasFailures) {
|
||||
this.responseHasFailures = responseHasFailures;
|
||||
}
|
||||
|
||||
public void setExceptionToThrow(Exception exceptionToThrow) {
|
||||
this.exceptionToThrow = exceptionToThrow;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createElasticsearchClient(ProcessContext context) throws ProcessException {
|
||||
final Client mockClient = mock(Client.class);
|
||||
BulkRequestBuilder bulkRequestBuilder = spy(new BulkRequestBuilder(mockClient, BulkAction.INSTANCE));
|
||||
if (exceptionToThrow != null) {
|
||||
doThrow(exceptionToThrow).when(bulkRequestBuilder).execute();
|
||||
} else {
|
||||
doReturn(new MockBulkRequestBuilderExecutor(responseHasFailures)).when(bulkRequestBuilder).execute();
|
||||
}
|
||||
when(mockClient.prepareBulk()).thenReturn(bulkRequestBuilder);
|
||||
|
||||
when(mockClient.prepareIndex(anyString(), anyString(), anyString())).thenAnswer(new Answer<IndexRequestBuilder>() {
|
||||
@Override
|
||||
public IndexRequestBuilder answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
Object[] args = invocationOnMock.getArguments();
|
||||
String arg1 = (String) args[0];
|
||||
if (arg1.isEmpty()) {
|
||||
throw new NoNodeAvailableException("Needs index");
|
||||
}
|
||||
String arg2 = (String) args[1];
|
||||
if (arg2.isEmpty()) {
|
||||
throw new NoNodeAvailableException("Needs doc type");
|
||||
} else {
|
||||
IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(mockClient, IndexAction.INSTANCE);
|
||||
return indexRequestBuilder;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
esClient.set(mockClient);
|
||||
}
|
||||
|
||||
private static class MockBulkRequestBuilderExecutor
|
||||
extends AdapterActionFuture<BulkResponse, ActionListener<BulkResponse>>
|
||||
implements ListenableActionFuture<BulkResponse> {
|
||||
|
||||
boolean responseHasFailures = false;
|
||||
|
||||
public MockBulkRequestBuilderExecutor(boolean responseHasFailures) {
|
||||
this.responseHasFailures = responseHasFailures;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BulkResponse convert(ActionListener<BulkResponse> bulkResponseActionListener) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(ActionListener<BulkResponse> actionListener) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkResponse get() throws InterruptedException, ExecutionException {
|
||||
BulkResponse response = mock(BulkResponse.class);
|
||||
when(response.hasFailures()).thenReturn(responseHasFailures);
|
||||
BulkItemResponse item1 = mock(BulkItemResponse.class);
|
||||
BulkItemResponse item2 = mock(BulkItemResponse.class);
|
||||
when(item1.getItemId()).thenReturn(1);
|
||||
when(item1.isFailed()).thenReturn(true);
|
||||
BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class);
|
||||
when(failure.getMessage()).thenReturn("Bad message");
|
||||
when(item1.getFailure()).thenReturn(failure);
|
||||
when(item2.getItemId()).thenReturn(2);
|
||||
when(item2.isFailed()).thenReturn(false);
|
||||
when(response.getItems()).thenReturn(new BulkItemResponse[]{item1, item2});
|
||||
return response;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Integration test section below
|
||||
//
|
||||
// The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution.
|
||||
// However if you wish to execute them as part of a test phase, comment out the @Ignored line for each
|
||||
// desired test.
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Tests basic ES functionality against a local or test ES cluster
|
||||
*/
|
||||
@Test
|
||||
@Ignore("Comment this out if you want to run against local or test ES")
|
||||
public void testPutElasticSearchBasic() {
|
||||
System.out.println("Starting test " + new Object() {
|
||||
}.getClass().getEnclosingMethod().getName());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch());
|
||||
|
||||
//Local Cluster - Mac pulled from brew
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
|
||||
|
||||
runner.setProperty(PutElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
|
||||
|
||||
runner.setProperty(PutElasticsearch.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
|
||||
|
||||
runner.enqueue(docExample);
|
||||
runner.run(1, true, true);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Comment this out if you want to run against local or test ES")
|
||||
public void testPutElasticSearchBatch() throws IOException {
|
||||
System.out.println("Starting test " + new Object() {
|
||||
}.getClass().getEnclosingMethod().getName());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearch());
|
||||
|
||||
//Local Cluster - Mac pulled from brew
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s");
|
||||
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s");
|
||||
runner.setProperty(PutElasticsearch.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearch.BATCH_SIZE, "100");
|
||||
|
||||
runner.setProperty(PutElasticsearch.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
|
||||
runner.assertValid();
|
||||
|
||||
|
||||
String message = convertStreamToString(docExample);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
|
||||
long newId = 28039652140L + i;
|
||||
final String newStrId = Long.toString(newId);
|
||||
runner.enqueue(message.getBytes(), new HashMap<String, String>() {{
|
||||
put("doc_id", newStrId);
|
||||
}});
|
||||
|
||||
}
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 100);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert an input stream to a stream
|
||||
*
|
||||
* @param is input the input stream
|
||||
* @return return the converted input stream as a string
|
||||
*/
|
||||
static String convertStreamToString(java.io.InputStream is) {
|
||||
java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
|
||||
return s.hasNext() ? s.next() : "";
|
||||
}
|
||||
}
|
|
@ -35,7 +35,6 @@ import org.apache.nifi.util.TestRunner;
|
|||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
@ -498,133 +497,4 @@ public class TestPutElasticsearchHttp {
|
|||
return client;
|
||||
}
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Integration test section below
|
||||
//
|
||||
// The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution.
|
||||
// However if you wish to execute them as part of a test phase, comment out the @Ignored line for each
|
||||
// desired test.
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Tests basic ES functionality against a local or test ES cluster
|
||||
*/
|
||||
@Test
|
||||
@Ignore("Comment this out if you want to run against local or test ES")
|
||||
public void testPutElasticSearchBasic() {
|
||||
System.out.println("Starting test " + new Object() {
|
||||
}.getClass().getEnclosingMethod().getName());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttp());
|
||||
|
||||
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
|
||||
runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
|
||||
runner.setProperty(PutElasticsearchHttp.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
|
||||
runner.enqueue(docExample);
|
||||
runner.run(1, true, true);
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Comment this out if you want to run against local or test ES")
|
||||
public void testPutElasticSearchBatch() throws IOException {
|
||||
System.out.println("Starting test " + new Object() {
|
||||
}.getClass().getEnclosingMethod().getName());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttp());
|
||||
|
||||
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
|
||||
runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "100");
|
||||
runner.setProperty(PutElasticsearchHttp.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
|
||||
runner.assertValid();
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
long newId = 28039652140L + i;
|
||||
final String newStrId = Long.toString(newId);
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", newStrId);
|
||||
}});
|
||||
}
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Un-authenticated proxy : Comment this out if you want to run against local proxied ES.")
|
||||
public void testPutElasticSearchBasicBehindProxy() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttp());
|
||||
runner.setValidateExpressionUsage(false);
|
||||
|
||||
runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
|
||||
runner.setProperty(PutElasticsearchHttp.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
|
||||
|
||||
runner.setProperty(PutElasticsearchHttp.PROXY_HOST, "localhost");
|
||||
runner.setProperty(PutElasticsearchHttp.PROXY_PORT, "3228");
|
||||
runner.setProperty(PutElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
|
||||
runner.enqueue(docExample);
|
||||
runner.run(1, true, true);
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Authenticated Proxy : Comment this out if you want to run against local proxied ES.")
|
||||
public void testPutElasticSearchBasicBehindAuthenticatedProxy() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttp());
|
||||
runner.setValidateExpressionUsage(false);
|
||||
|
||||
runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
|
||||
runner.setProperty(PutElasticsearchHttp.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
|
||||
|
||||
runner.setProperty(PutElasticsearchHttp.PROXY_HOST, "localhost");
|
||||
runner.setProperty(PutElasticsearchHttp.PROXY_PORT, "3328");
|
||||
runner.setProperty(PutElasticsearchHttp.PROXY_USERNAME, "squid");
|
||||
runner.setProperty(PutElasticsearchHttp.PROXY_PASSWORD, "changeme");
|
||||
runner.setProperty(PutElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
|
||||
|
||||
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
|
||||
runner.enqueue(docExample);
|
||||
runner.run(1, true, true);
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test(expected = AssertionError.class)
|
||||
public void testPutElasticSearchBadHostInEL() throws IOException {
|
||||
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
|
||||
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
|
||||
|
||||
runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearchHttp.TYPE, "status");
|
||||
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
|
||||
runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
|
||||
|
||||
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
runner.run(1, true, true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,14 +25,11 @@ import okhttp3.Request;
|
|||
import okhttp3.Response;
|
||||
import okhttp3.ResponseBody;
|
||||
import okio.Buffer;
|
||||
import org.apache.nifi.json.JsonTreeReader;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
import org.apache.nifi.provenance.ProvenanceEventType;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.schema.access.SchemaAccessUtils;
|
||||
import org.apache.nifi.serialization.RecordReaderFactory;
|
||||
import org.apache.nifi.serialization.record.MockRecordParser;
|
||||
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
|
@ -40,7 +37,6 @@ import org.apache.nifi.util.MockFlowFile;
|
|||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.After;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -828,79 +824,6 @@ public class TestPutElasticsearchHttpRecord {
|
|||
}
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Integration test section below
|
||||
//
|
||||
// The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution.
|
||||
// However if you wish to execute them as part of a test phase, comment out the @Ignored line for each
|
||||
// desired test.
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Tests basic ES functionality against a local or test ES cluster
|
||||
*/
|
||||
@Test
|
||||
@Ignore("Comment this out if you want to run against local or test ES")
|
||||
public void testPutElasticSearchBasic() throws InitializationException {
|
||||
System.out.println("Starting test " + new Object() {
|
||||
}.getClass().getEnclosingMethod().getName());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecord());
|
||||
|
||||
final RecordReaderFactory reader = new JsonTreeReader();
|
||||
runner.addControllerService("reader", reader);
|
||||
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
|
||||
runner.enableControllerService(reader);
|
||||
runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "reader");
|
||||
|
||||
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "_doc");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue("{\"id\": 28039652140}");
|
||||
|
||||
runner.enqueue(new byte[0]);
|
||||
runner.run(1, true, true);
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
|
||||
List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
|
||||
assertNotNull(provEvents);
|
||||
assertEquals(1, provEvents.size());
|
||||
assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Comment this out if you want to run against local or test ES")
|
||||
public void testPutElasticSearchBatch() throws InitializationException {
|
||||
System.out.println("Starting test " + new Object() {
|
||||
}.getClass().getEnclosingMethod().getName());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecord());
|
||||
|
||||
final RecordReaderFactory reader = new JsonTreeReader();
|
||||
runner.addControllerService("reader", reader);
|
||||
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
|
||||
runner.enableControllerService(reader);
|
||||
runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "reader");
|
||||
|
||||
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "_doc");
|
||||
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
|
||||
runner.assertValid();
|
||||
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
long newId = 28039652140L + i;
|
||||
final String newStrId = Long.toString(newId);
|
||||
sb.append("{\"id\": ").append(newStrId).append("}\n");
|
||||
}
|
||||
runner.enqueue(sb.toString().getBytes());
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
|
||||
final String content = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0).getContent();
|
||||
assertEquals(sb.toString(), content);
|
||||
}
|
||||
|
||||
@Test(expected = AssertionError.class)
|
||||
public void testPutElasticSearchBadHostInEL() {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecord());
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.nifi.util.MockFlowFile;
|
|||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.After;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
@ -380,65 +379,6 @@ public class TestQueryElasticsearchHttp {
|
|||
runner.run(1, true, true);
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// Integration test section below
|
||||
//
|
||||
// The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution.
|
||||
// However if you wish to execute them as part of a test phase, comment out the @Ignored line for each
|
||||
// desired test.
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@Test
|
||||
@Ignore("Un-authenticated proxy : Comment this out if you want to run against local proxied ES.")
|
||||
public void testQueryElasticsearchBasicBehindProxy() {
|
||||
System.out.println("Starting test " + new Object() {
|
||||
}.getClass().getEnclosingMethod().getName());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new QueryElasticsearchHttp());
|
||||
|
||||
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
|
||||
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
|
||||
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
|
||||
runner.setProperty(QueryElasticsearchHttp.FIELDS, "id,, userinfo.location");
|
||||
|
||||
runner.setProperty(QueryElasticsearchHttp.PROXY_HOST, "localhost");
|
||||
runner.setProperty(QueryElasticsearchHttp.PROXY_PORT, "3228");
|
||||
runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
|
||||
|
||||
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
|
||||
runner.run(1, true, true);
|
||||
runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore("Authenticated Proxy : Comment this out if you want to run against local proxied ES.")
|
||||
public void testQueryElasticsearchBasicBehindAuthenticatedProxy() {
|
||||
System.out.println("Starting test " + new Object() {
|
||||
}.getClass().getEnclosingMethod().getName());
|
||||
final TestRunner runner = TestRunners.newTestRunner(new QueryElasticsearchHttp());
|
||||
runner.setValidateExpressionUsage(true);
|
||||
|
||||
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
|
||||
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
|
||||
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
|
||||
runner.setProperty(QueryElasticsearchHttp.FIELDS, "id,, userinfo.location");
|
||||
|
||||
runner.setProperty(QueryElasticsearchHttp.PROXY_HOST, "localhost");
|
||||
runner.setProperty(QueryElasticsearchHttp.PROXY_PORT, "3328");
|
||||
runner.setProperty(QueryElasticsearchHttp.PROXY_USERNAME, "squid");
|
||||
runner.setProperty(QueryElasticsearchHttp.PROXY_PASSWORD, "changeme");
|
||||
runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
|
||||
|
||||
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
|
||||
put("doc_id", "28039652140");
|
||||
}});
|
||||
|
||||
runner.run(1, true, true);
|
||||
runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueryElasticsearchOnTrigger_withQueryParameters() throws IOException {
|
||||
QueryElasticsearchHttpTestProcessor p = new QueryElasticsearchHttpTestProcessor();
|
||||
|
|
Loading…
Reference in New Issue