- NIFI-9388: add GetElasticsearch to fetch Elasticsearch document using the ElasticsearchClientService

- NIFI-9387: add Proxy capability to ElasticsearchClientService
- NIFI-1576: allow GetElasticsearch to run without requiring FlowFile input

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #5535.
This commit is contained in:
Chris Sampson 2021-11-18 22:06:46 +00:00 committed by Joe Gresock
parent 1724cb0d3b
commit 1745d2a88e
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
24 changed files with 681 additions and 76 deletions

View File

@ -43,6 +43,12 @@
<version>1.16.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
<version>1.16.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>

View File

@ -24,6 +24,8 @@ import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.ssl.SSLContextService;
import java.util.List;
@ -49,6 +51,7 @@ public interface ElasticSearchClientService extends ControllerService {
.identifiesControllerService(SSLContextService.class)
.addValidator(Validator.VALID)
.build();
PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxySpec.HTTP);
PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("el-cs-username")
.displayName("Username")

View File

@ -20,7 +20,7 @@ package org.apache.nifi.elasticsearch;
import java.util.HashSet;
import java.util.Set;
public class ElasticsearchError extends RuntimeException {
public class ElasticsearchException extends RuntimeException {
/**
* These are names of common Elasticsearch exceptions where it is safe to assume
* that it's OK to retry the operation instead of just sending it to an error relationship.
@ -32,17 +32,32 @@ public class ElasticsearchError extends RuntimeException {
add("NodeClosedException");
}};
protected boolean isElastic;
protected boolean elastic;
public ElasticsearchError(final Exception ex) {
protected boolean notFound;
public ElasticsearchException(final Exception ex) {
super(ex);
final boolean isKnownException = ELASTIC_ERROR_NAMES.contains(ex.getClass().getSimpleName());
final boolean isServiceUnavailable = "ResponseException".equals(ex.getClass().getSimpleName())
&& ex.getMessage().contains("503 Service Unavailable");
isElastic = isKnownException || isServiceUnavailable;
final boolean isServiceUnavailable;
if ("ResponseException".equals(ex.getClass().getSimpleName())) {
isServiceUnavailable = ex.getMessage().contains("503 Service Unavailable");
notFound = ex.getMessage().contains("404 Not Found");
} else {
isServiceUnavailable = false;
}
elastic = isKnownException || isServiceUnavailable;
}
public boolean isElastic() {
return isElastic;
return elastic;
}
public boolean isNotFound() {
return notFound;
}
}

View File

@ -74,6 +74,12 @@
<version>1.16.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
<version>1.16.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-elasticsearch-client-service-api</artifactId>
@ -143,9 +149,10 @@
to an Elastic-provided Elasticsearch instead of an instance provided by someone else (e.g. AWS OpenSearch)
see: https://opensearch.org/blog/community/2021/08/community-clients/ for more info.
Note: the low-level elasticsearch-rest-client remains licensed with Apache 2.0 even after the move
Note: the low-level elasticsearch-rest-client remains licensed with Apache 2.0
(https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_license.html) even after the move
of the main Elasticsearch product and elasticsearch-rest-high-level-client to Elastic 2.0/SSPL 1.0 in v7.11.0+ -->
<version>7.10.2</version>
<version>7.13.4</version>
<scope>compile</scope>
<exclusions>
<exclusion>

View File

@ -36,6 +36,8 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StopWatch;
@ -50,6 +52,7 @@ import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.Proxy;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@ -79,6 +82,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
props.add(ElasticSearchClientService.USERNAME);
props.add(ElasticSearchClientService.PASSWORD);
props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
props.add(ElasticSearchClientService.PROXY_CONFIGURATION_SERVICE);
props.add(ElasticSearchClientService.CONNECT_TIMEOUT);
props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
props.add(ElasticSearchClientService.RETRY_TIMEOUT);
@ -129,6 +133,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger();
final Integer readTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger();
final ProxyConfigurationService proxyConfigurationService = context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
final HttpHost[] hh = new HttpHost[hostsSplit.length];
for (int x = 0; x < hh.length; x++) {
final URL u = new URL(hostsSplit[x]);
@ -150,10 +156,22 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
httpClientBuilder.setSSLContext(sslContext);
}
CredentialsProvider credentialsProvider = null;
if (username != null && password != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
credentialsProvider = addCredentials(null, AuthScope.ANY, username, password);
}
if (proxyConfigurationService != null) {
final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
if (Proxy.Type.HTTP == proxyConfiguration.getProxyType()) {
final HttpHost proxy = new HttpHost(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort(), "http");
httpClientBuilder.setProxy(proxy);
credentialsProvider = addCredentials(credentialsProvider, new AuthScope(proxy), proxyConfiguration.getProxyUserName(), proxyConfiguration.getProxyUserPassword());
}
}
if (credentialsProvider != null) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
@ -168,6 +186,19 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
this.client = builder.build();
}
private CredentialsProvider addCredentials(final CredentialsProvider credentialsProvider, final AuthScope authScope, final String username, final String password) {
final CredentialsProvider cp = credentialsProvider != null ? credentialsProvider : new BasicCredentialsProvider();
if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
cp.setCredentials(
authScope == null ? AuthScope.ANY : authScope,
new UsernamePasswordCredentials(username, password)
);
}
return cp;
}
private Response runQuery(final String endpoint, final String query, final String index, final String type, final Map<String, String> requestParameters) {
final StringBuilder sb = new StringBuilder();
if (StringUtils.isNotBlank(index)) {
@ -183,7 +214,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
try {
return performRequest("POST", sb.toString(), requestParameters, queryEntity);
} catch (final Exception e) {
throw new ElasticsearchError(e);
throw new ElasticsearchException(e);
}
}
@ -203,7 +234,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
throw new IOException(errorMessage);
}
} catch (final Exception ex) {
throw new ElasticsearchError(ex);
throw new ElasticsearchException(ex);
}
}
@ -301,7 +332,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
return IndexOperationResponse.fromJsonResponse(rawResponse);
} catch (final Exception ex) {
throw new ElasticsearchError(ex);
throw new ElasticsearchException(ex);
}
}
@ -340,7 +371,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
parseResponseWarningHeaders(response);
return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
} catch (final Exception ex) {
throw new RuntimeException(ex);
throw new ElasticsearchException(ex);
}
}
@ -389,8 +420,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
return (Map<String, Object>) mapper.readValue(body, Map.class).get("_source");
} catch (final Exception ex) {
getLogger().error("", ex);
return null;
throw new ElasticsearchException(ex);
}
}
@ -414,7 +444,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final Response response = runQuery("_search", query, index, type, requestParameters);
return buildSearchResponse(response);
} catch (final Exception ex) {
throw new RuntimeException(ex);
throw new ElasticsearchException(ex);
}
}
@ -425,7 +455,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final Response response = performRequest("POST", "/_search/scroll", Collections.emptyMap(), scrollEntity);
return buildSearchResponse(response);
} catch (final Exception ex) {
throw new RuntimeException(ex);
throw new ElasticsearchException(ex);
}
}
@ -447,7 +477,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
return (String) mapper.readValue(body, Map.class).get("id");
} catch (final Exception ex) {
throw new RuntimeException(ex);
throw new ElasticsearchException(ex);
}
}
@ -472,11 +502,10 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
if (404 == re.getResponse().getStatusLine().getStatusCode()) {
getLogger().debug("Point in Time {} not found in Elasticsearch for deletion, ignoring", pitId);
return new DeleteOperationResponse(0);
} else {
throw new RuntimeException(re);
}
throw new ElasticsearchException(re);
} catch (final Exception ex) {
throw new RuntimeException(ex);
throw new ElasticsearchException(ex);
}
}
@ -501,11 +530,10 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
if (404 == re.getResponse().getStatusLine().getStatusCode()) {
getLogger().debug("Scroll Id {} not found in Elasticsearch for deletion, ignoring", scrollId);
return new DeleteOperationResponse(0);
} else {
throw new RuntimeException(re);
}
throw new ElasticsearchException(re);
} catch (final Exception ex) {
throw new RuntimeException(ex);
throw new ElasticsearchException(ex);
}
}

View File

@ -84,7 +84,7 @@ public class ElasticSearchStringLookupService extends AbstractControllerService
}
@Override
public Optional<String> lookup(Map<String, Object> coordinates) throws LookupFailureException {
public Optional<String> lookup(final Map<String, Object> coordinates) throws LookupFailureException {
try {
final String id = (String) coordinates.get(ID);
final Map<String, Object> enums = esClient.get(index, type, id, null);
@ -93,7 +93,7 @@ public class ElasticSearchStringLookupService extends AbstractControllerService
} else {
return Optional.ofNullable(mapper.writeValueAsString(enums));
}
} catch (IOException e) {
} catch (final IOException | ElasticsearchException e) {
throw new LookupFailureException(e);
}
}

View File

@ -22,6 +22,7 @@ import org.apache.maven.artifact.versioning.ComparableVersion
import org.apache.nifi.elasticsearch.DeleteOperationResponse
import org.apache.nifi.elasticsearch.ElasticSearchClientService
import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
import org.apache.nifi.elasticsearch.ElasticsearchException
import org.apache.nifi.elasticsearch.IndexOperationRequest
import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.elasticsearch.SearchResponse
@ -467,21 +468,24 @@ class ElasticSearchClientService_IT {
void testDeleteById() throws Exception {
final String ID = "1"
final def originalDoc = service.get(INDEX, TYPE, ID, null)
DeleteOperationResponse response = service.deleteById(INDEX, TYPE, ID, null)
Assert.assertNotNull(response)
Assert.assertTrue(response.getTook() > 0)
def doc = service.get(INDEX, TYPE, ID, null)
Assert.assertNull(doc)
doc = service.get(INDEX, TYPE, "2", null)
Assert.assertNotNull(doc)
// replace the deleted doc
service.add(new IndexOperationRequest(INDEX, TYPE, "1", originalDoc, IndexOperationRequest.Operation.Index), null)
waitForIndexRefresh() // (affects later tests using _search or _bulk)
try {
DeleteOperationResponse response = service.deleteById(INDEX, TYPE, ID, null)
Assert.assertNotNull(response)
Assert.assertTrue(response.getTook() > 0)
final ElasticsearchException ee = Assert.assertThrows(ElasticsearchException.class, { ->
service.get(INDEX, TYPE, ID, null) })
Assert.assertTrue(ee.isNotFound())
final def doc = service.get(INDEX, TYPE, "2", null)
Assert.assertNotNull(doc)
} finally {
// replace the deleted doc
service.add(new IndexOperationRequest(INDEX, TYPE, "1", originalDoc, IndexOperationRequest.Operation.Index), null)
waitForIndexRefresh() // (affects later tests using _search or _bulk)
}
}
@Test
void testGet() throws IOException {
void testGet() {
Map old
1.upto(15) { index ->
String id = String.valueOf(index)
@ -492,6 +496,12 @@ class ElasticSearchClientService_IT {
}
}
@Test
void testGetNotFound() {
final ElasticsearchException ee = Assert.assertThrows(ElasticsearchException.class, { -> service.get(INDEX, TYPE, "not_found", null) })
Assert.assertTrue(ee.isNotFound())
}
@Test
void testSSL() {
final String serviceIdentifier = SSLContextService.class.getName()
@ -671,8 +681,10 @@ class ElasticSearchClientService_IT {
deletes.add(new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, null, IndexOperationRequest.Operation.Delete))
Assert.assertFalse(service.bulk(deletes, [refresh: "true"]).hasErrors())
waitForIndexRefresh() // wait 1s for index refresh (doesn't prevent GET but affects later tests using _search or _bulk)
Assert.assertNull(service.get(INDEX, TYPE, TEST_ID, null))
Assert.assertNull(service.get(INDEX, TYPE, UPSERTED_ID, null))
ElasticsearchException ee = Assert.assertThrows(ElasticsearchException.class, { -> service.get(INDEX, TYPE, TEST_ID, null) })
Assert.assertTrue(ee.isNotFound())
ee = Assert.assertThrows(ElasticsearchException.class, { -> service.get(INDEX, TYPE, UPSERTED_ID, null) })
Assert.assertTrue(ee.isNotFound())
}
@Test

View File

@ -20,6 +20,7 @@ package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticsearchException;
import org.apache.nifi.elasticsearch.OperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@ -56,6 +57,7 @@ public abstract class AbstractByQueryElasticsearch extends AbstractProcessor imp
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
rels.add(REL_RETRY);
relationships = Collections.unmodifiableSet(rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
@ -138,12 +140,21 @@ public abstract class AbstractByQueryElasticsearch extends AbstractProcessor imp
input = session.putAllAttributes(input, attrs);
session.transfer(input, REL_SUCCESS);
} catch (final ElasticsearchException ese) {
final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
ese.isElastic() ? "Routing to retry." : "Routing to failure");
getLogger().error(msg, ese);
if (input != null) {
session.penalize(input);
input = session.putAttribute(input, getErrorAttribute(), ese.getMessage());
session.transfer(input, ese.isElastic() ? REL_RETRY : REL_FAILURE);
}
} catch (final Exception e) {
getLogger().error("Error running \"by query\" operation: ", e);
if (input != null) {
input = session.putAttribute(input, getErrorAttribute(), e.getMessage());
session.transfer(input, REL_FAILURE);
}
getLogger().error("Error running \"by query\" operation: ", e);
context.yield();
}
}

View File

@ -22,6 +22,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticsearchException;
import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@ -168,9 +169,19 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
final SearchResponse response = doQuery(queryJsonParameters, hitsFlowFiles, session, context, input, stopWatch);
finishQuery(input, queryJsonParameters, session, context, response);
} catch (Exception ex) {
getLogger().error("Error processing flowfile.", ex);
} catch (final ElasticsearchException ese) {
final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
ese.isElastic() ? "Routing to retry." : "Routing to failure");
getLogger().error(msg, ese);
if (input != null) {
session.penalize(input);
input = session.putAttribute(input, "elasticsearch.query.error", ese.getMessage());
session.transfer(input, ese.isElastic() ? REL_RETRY : REL_FAILURE);
}
} catch (Exception ex) {
getLogger().error("Could not query documents.", ex);
if (input != null) {
input = session.putAttribute(input, "elasticsearch.query.error", ex.getMessage());
session.transfer(input, REL_FAILURE);
}
context.yield();

View File

@ -34,7 +34,7 @@ import java.util.Map;
@WritesAttribute(attribute = "elasticsearch.delete.error", description = "The error message provided by Elasticsearch if there is an error running the delete.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@Tags({ "elastic", "elasticsearch", "delete", "query"})
@Tags({ "elastic", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "delete", "query"})
@CapabilityDescription("Delete from an Elasticsearch index using a query. The query can be loaded from a flowfile body " +
"or from the Query parameter.")
@DynamicProperty(

View File

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticsearchException;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index", "record"})
@CapabilityDescription("Elasticsearch get processor that uses the official Elastic REST client libraries. " +
"Note that the full body of the document will be read into memory before being written to a FlowFile for transfer.")
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
@WritesAttribute(attribute = "elasticsearch.index", description = "The Elasticsearch index containing the document"),
@WritesAttribute(attribute = "elasticsearch.type", description = "The Elasticsearch document type"),
@WritesAttribute(attribute = "elasticsearch.get.error", description = "The error message provided by Elasticsearch if there is an error fetching the document.")
})
@DynamicProperty(
name = "The name of a URL query parameter to add",
value = "The value of the URL query parameter",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing.")
public class GetElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
static final AllowableValue FLOWFILE_CONTENT = new AllowableValue(
"flowfile-content",
"FlowFile Content",
"Output the retrieved document as the FlowFile content."
);
static final AllowableValue FLOWFILE_ATTRIBUTE = new AllowableValue(
"flowfile-attribute",
"FlowFile Attribute",
"Output the retrieved document as a FlowFile attribute specified by the Attribute Name."
);
static final PropertyDescriptor ID = new PropertyDescriptor.Builder()
.name("get-es-id")
.displayName("Document Id")
.description("The _id of the document to retrieve.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
.name("get-es-destination")
.displayName("Destination")
.description("Indicates whether the retrieved document is written to the FlowFile content or a FlowFile attribute.")
.required(true)
.allowableValues(FLOWFILE_CONTENT, FLOWFILE_ATTRIBUTE)
.defaultValue(FLOWFILE_CONTENT.getValue())
.build();
static final PropertyDescriptor ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
.name("get-es-attribute-name")
.displayName("Attribute Name")
.description("The name of the FlowFile attribute to use for the retrieved document output.")
.required(true)
.defaultValue("elasticsearch.doc")
.dependsOn(DESTINATION, FLOWFILE_ATTRIBUTE)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final Relationship REL_DOC = new Relationship.Builder().name("document")
.description("Fetched documents are routed to this relationship.")
.build();
static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not_found")
.description("A FlowFile is routed to this relationship if the specified document does not exist in the Elasticsearch cluster.")
.build();
static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
ID, INDEX, TYPE, DESTINATION, ATTRIBUTE_NAME, CLIENT_SERVICE
));
static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_DOC, REL_FAILURE, REL_RETRY, REL_NOT_FOUND
)));
private volatile ElasticSearchClientService clientService;
private final ObjectMapper mapper = new ObjectMapper();
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dynamic(true)
.build();
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile input = session.get();
final String id = context.getProperty(ID).evaluateAttributeExpressions(input).getValue();
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
final String destination = context.getProperty(DESTINATION).getValue();
final String attributeName = context.getProperty(ATTRIBUTE_NAME).evaluateAttributeExpressions(input).getValue();
try {
final StopWatch stopWatch = new StopWatch(true);
final Map<String, Object> doc = clientService.get(index, type, id, getUrlQueryParameters(context, input));
final Map<String, String> attributes = new HashMap<>(4, 1);
attributes.put("filename", id);
attributes.put("elasticsearch.index", index);
if (type != null) {
attributes.put("elasticsearch.type", type);
}
final String json = mapper.writeValueAsString(doc);
FlowFile documentFlowFile = input != null ? input : session.create();
if (FLOWFILE_CONTENT.getValue().equals(destination)) {
documentFlowFile = session.write(documentFlowFile, out -> out.write(json.getBytes()));
} else {
attributes.put(attributeName, json);
}
documentFlowFile = session.putAllAttributes(documentFlowFile, attributes);
session.getProvenanceReporter().receive(documentFlowFile, clientService.getTransitUrl(index, type), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(documentFlowFile, REL_DOC);
} catch (final ElasticsearchException ese) {
if (ese.isNotFound()) {
if (input != null) {
session.transfer(input, REL_NOT_FOUND);
} else {
getLogger().warn("Document with _id {} not found in index {} (and type {})", id, index, type);
}
} else {
final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
ese.isElastic() ? "Routing to retry." : "Routing to failure");
getLogger().error(msg, ese);
if (input != null) {
session.penalize(input);
input = session.putAttribute(input, "elasticsearch.get.error", ese.getMessage());
session.transfer(input, ese.isElastic() ? REL_RETRY : REL_FAILURE);
}
}
} catch (final Exception ex) {
getLogger().error("Could not fetch document.", ex);
if (input != null) {
input = session.putAttribute(input, "elasticsearch.get.error", ex.getMessage());
session.transfer(input, REL_FAILURE);
}
context.yield();
}
}
}

View File

@ -39,11 +39,12 @@ import java.util.concurrent.TimeUnit;
@WritesAttribute(attribute = "mime.type", description = "application/json"),
@WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
@WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
@WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
@WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile"),
@WritesAttribute(attribute = "elasticsearch.query.error", description = "The error message provided by Elasticsearch if there is an error querying the index.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@EventDriven
@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"})
@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "query", "read", "get", "json"})
@CapabilityDescription("A processor that allows the user to run a query (with aggregations) written with the " +
"Elasticsearch JSON DSL. It does not automatically paginate queries for the user. If an incoming relationship is added to this " +
"processor, it will use the flowfile's content for the query. Care should be taken on the size of the query because the entire response " +

View File

@ -42,11 +42,12 @@ import java.util.List;
@WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
@WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
@WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
@WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
@WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile"),
@WritesAttribute(attribute = "elasticsearch.query.error", description = "The error message provided by Elasticsearch if there is an error querying the index.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "read", "json"})
@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "query", "scroll", "page", "read", "json"})
@CapabilityDescription("A processor that allows the user to run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
"It will use the flowfile's content for the query unless the QUERY attribute is populated. " +
"Search After/Point in Time queries must include a valid \"sort\" field.")
@ -58,7 +59,7 @@ import java.util.List;
"These parameters will override any matching parameters in the query request body")
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
"from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
public class PaginatedJsonQueryElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {
public class PaginatedJsonQueryElasticsearch extends AbstractPaginatedJsonQueryElasticsearch {
private static final List<PropertyDescriptor> propertyDescriptors;
static {

View File

@ -21,6 +21,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -30,7 +32,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticsearchError;
import org.apache.nifi.elasticsearch.ElasticsearchException;
import org.apache.nifi.elasticsearch.IndexOperationRequest;
import org.apache.nifi.elasticsearch.IndexOperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
@ -72,14 +74,17 @@ import java.util.Optional;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "put", "index", "record"})
@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "put", "index", "record"})
@CapabilityDescription("A record-aware Elasticsearch put processor that uses the official Elastic REST client libraries.")
@WritesAttributes({
@WritesAttribute(attribute = "elasticsearch.put.error", description = "The error message provided by Elasticsearch if there is an error indexing the documents.")
})
@DynamicProperty(
name = "The name of a URL query parameter to add",
value = "The value of the URL query parameter",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " +
"These parameters will override any matching parameters in the query request body")
"These parameters will override any matching parameters in the _bulk request body")
public class PutElasticsearchRecord extends AbstractProcessor implements ElasticsearchRestProcessor {
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("put-es-record-reader")
@ -346,7 +351,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final FlowFile input = session.get();
FlowFile input = session.get();
if (input == null) {
return;
}
@ -415,18 +420,21 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
badRecords.add(bad);
}
}
} catch (final ElasticsearchError ese) {
} catch (final ElasticsearchException ese) {
final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
ese.isElastic() ? "Moving to retry." : "Moving to failure");
ese.isElastic() ? "Routing to retry." : "Routing to failure");
getLogger().error(msg, ese);
final Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE;
session.penalize(input);
input = session.putAttribute(input, "elasticsearch.put.error", ese.getMessage());
session.transfer(input, rel);
removeBadRecordFlowFiles(badRecords, session);
return;
} catch (final Exception ex) {
getLogger().error("Could not index documents.", ex);
input = session.putAttribute(input, "elasticsearch.put.error", ex.getMessage());
session.transfer(input, REL_FAILURE);
context.yield();
removeBadRecordFlowFiles(badRecords, session);
return;
}

View File

@ -55,13 +55,14 @@ import java.util.Set;
@WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
@WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
@WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
@WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile")
@WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile"),
@WritesAttribute(attribute = "elasticsearch.query.error", description = "The error message provided by Elasticsearch if there is an error querying the index.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@PrimaryNodeOnly
@DefaultSchedule(period="1 min")
@Tags({"elasticsearch", "elasticsearch 5", "query", "scroll", "page", "search", "json"})
@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "query", "scroll", "page", "search", "json"})
@CapabilityDescription("A processor that allows the user to repeatedly run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
"Search After/Point in Time queries must include a valid \"sort\" field. The processor will retrieve multiple pages of results " +
"until either no more results are available or the Pagination Keep Alive expiration is reached, after which the query will " +
@ -77,7 +78,7 @@ import java.util.Set;
"(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
"from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
public class SearchElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {
public class SearchElasticsearch extends AbstractPaginatedJsonQueryElasticsearch {
static final String STATE_SCROLL_ID = "scrollId";
static final String STATE_PIT_ID = "pitId";
static final String STATE_SEARCH_AFTER = "searchAfter";

View File

@ -34,7 +34,7 @@ import java.util.Map;
@WritesAttribute(attribute = "elasticsearch.update.error", description = "The error message provided by Elasticsearch if there is an error running the update.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@Tags({ "elastic", "elasticsearch", "update", "query"})
@Tags({ "elastic", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "update", "query"})
@CapabilityDescription("Update documents in an Elasticsearch index using a query. The query can be loaded from a flowfile body " +
"or from the Query parameter.")
@DynamicProperty(

View File

@ -19,3 +19,4 @@ org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch
org.apache.nifi.processors.elasticsearch.SearchElasticsearch
org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord
org.apache.nifi.processors.elasticsearch.UpdateByQueryElasticsearch
org.apache.nifi.processors.elasticsearch.GetElasticsearch

View File

@ -279,7 +279,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
runOnce(runner)
final TestElasticsearchClientService service = runner.getControllerService("esService") as TestElasticsearchClientService
final TestElasticsearchClientService service = getService(runner)
if (getProcessor() instanceof SearchElasticsearch || getProcessor() instanceof PaginatedJsonQueryElasticsearch) {
Assert.assertEquals(3, service.getRequestParameters().size())
Assert.assertEquals("600s", service.getRequestParameters().get("scroll"))

View File

@ -178,7 +178,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
// check error was caught and logged
assertThat(runner.getLogger().getErrorMessages().stream()
.anyMatch({ logMessage ->
logMessage.getMsg().contains("Error processing flowfile") &&
logMessage.getMsg().contains("Could not query documents") &&
logMessage.getThrowable().getMessage() == "Simulated IOException - initialisePointInTime"
}),
is(true)
@ -199,7 +199,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
// check error was caught and logged
assertThat(runner.getLogger().getErrorMessages().stream()
.anyMatch({ logMessage ->
logMessage.getMsg().contains("Error processing flowfile") &&
logMessage.getMsg().contains("Could not query documents") &&
logMessage.getThrowable().getMessage() == "Query using pit/search_after must contain a \"sort\" field"
}),
is(true)
@ -213,7 +213,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
testCounts(runner, 0, 0, isInput() ? 1 : 0, 0)
assertThat(runner.getLogger().getErrorMessages().stream()
.anyMatch({ logMessage ->
logMessage.getMsg().contains("Error processing flowfile") &&
logMessage.getMsg().contains("Could not query documents") &&
logMessage.getThrowable().getMessage() == "Query using pit/search_after must contain a \"sort\" field"
}),
is(true)

View File

@ -0,0 +1,272 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.provenance.ProvenanceEventType
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.hamcrest.MatcherAssert
import org.junit.Assert
import org.junit.Test
import static groovy.json.JsonOutput.toJson
import static org.hamcrest.CoreMatchers.equalTo
import static org.hamcrest.CoreMatchers.is
import static org.hamcrest.MatcherAssert.assertThat
import static org.junit.Assert.assertThrows
class GetElasticsearchTest {
static final String INDEX_NAME = "messages"
@Test
void testMandatoryProperties() {
final TestRunner runner = createRunner()
runner.removeProperty(GetElasticsearch.CLIENT_SERVICE)
runner.removeProperty(GetElasticsearch.INDEX)
runner.removeProperty(GetElasticsearch.TYPE)
runner.removeProperty(GetElasticsearch.ID)
runner.removeProperty(GetElasticsearch.DESTINATION)
runner.removeProperty(GetElasticsearch.ATTRIBUTE_NAME)
final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run)
assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 3 validation failures:\n" +
"'%s' is invalid because %s is required\n" +
"'%s' is invalid because %s is required\n" +
"'%s' is invalid because %s is required\n",
GetElasticsearch.ID.getDisplayName(), GetElasticsearch.ID.getDisplayName(),
GetElasticsearch.INDEX.getDisplayName(), GetElasticsearch.INDEX.getDisplayName(),
GetElasticsearch.CLIENT_SERVICE.getDisplayName(), GetElasticsearch.CLIENT_SERVICE.getDisplayName()
)))
}
@Test
void testInvalidProperties() {
final TestRunner runner = createRunner()
runner.setProperty(GetElasticsearch.CLIENT_SERVICE, "not-a-service")
runner.setProperty(GetElasticsearch.INDEX, "")
runner.setProperty(GetElasticsearch.TYPE, "")
runner.setProperty(GetElasticsearch.ID, "")
runner.setProperty(GetElasticsearch.DESTINATION, "not-valid")
runner.setProperty(GetElasticsearch.ATTRIBUTE_NAME, "")
final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run)
assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 6 validation failures:\n" +
"'%s' validated against '' is invalid because %s cannot be empty\n" +
"'%s' validated against '' is invalid because %s cannot be empty\n" +
"'%s' validated against '' is invalid because %s cannot be empty\n" +
"'%s' validated against 'not-valid' is invalid because Given value not found in allowed set '%s'\n" +
"'%s' validated against 'not-a-service' is invalid because Property references a Controller Service that does not exist\n" +
"'%s' validated against 'not-a-service' is invalid because Invalid Controller Service: not-a-service is not a valid Controller Service Identifier\n",
GetElasticsearch.ID.getName(), GetElasticsearch.ID.getName(),
GetElasticsearch.INDEX.getName(), GetElasticsearch.INDEX.getName(),
GetElasticsearch.TYPE.getName(), GetElasticsearch.TYPE.getName(),
GetElasticsearch.DESTINATION.getName(), [GetElasticsearch.FLOWFILE_CONTENT.getValue(), GetElasticsearch.FLOWFILE_ATTRIBUTE.getValue()].join(", "),
GetElasticsearch.CLIENT_SERVICE.getDisplayName(),
GetElasticsearch.CLIENT_SERVICE.getDisplayName()
)))
}
@Test
void testInvalidAttributeName() {
final TestRunner runner = createRunner()
runner.setProperty(GetElasticsearch.DESTINATION, GetElasticsearch.FLOWFILE_ATTRIBUTE)
runner.setProperty(GetElasticsearch.ATTRIBUTE_NAME, "")
final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run)
assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 1 validation failures:\n" +
"'%s' validated against '' is invalid because %s cannot be empty\n",
GetElasticsearch.ATTRIBUTE_NAME.getName(), GetElasticsearch.ATTRIBUTE_NAME.getName()
)))
}
@Test
void testFetch() throws Exception {
final TestRunner runner = createRunner()
runProcessor(runner)
testCounts(runner, 1, 0, 0, 0)
final FlowFile doc = runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0)
assertOutputContent(doc.getContent())
MatcherAssert.assertThat(
runner.getProvenanceEvents().stream().filter({ pe ->
pe.getEventType() == ProvenanceEventType.RECEIVE &&
pe.getAttribute("uuid") == doc.getAttribute("uuid")
}).count(),
is(1L)
)
}
@Test
void testInputHandlingDestinationContent() {
final TestRunner runner = createRunner()
runner.setProperty(GetElasticsearch.DESTINATION, GetElasticsearch.FLOWFILE_CONTENT)
runner.setIncomingConnection(true)
runProcessor(runner)
testCounts(runner, 1, 0, 0, 0)
MockFlowFile doc = runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0)
assertOutputContent(doc.getContent())
assertCommonAttributes(doc)
assertOutputAttribute(doc)
reset(runner)
runner.setIncomingConnection(false)
runner.run()
testCounts(runner, 1, 0, 0, 0)
doc = runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0)
assertOutputContent(doc.getContent())
assertCommonAttributes(doc)
assertOutputAttribute(doc)
}
@Test
void testDestinationAttribute() {
final TestRunner runner = createRunner()
runner.setProperty(GetElasticsearch.DESTINATION, GetElasticsearch.FLOWFILE_ATTRIBUTE)
runner.setIncomingConnection(true)
runProcessor(runner)
testCounts(runner, 1, 0, 0, 0)
MockFlowFile doc = runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0)
assertThat(doc.getContent(), is("test"))
assertCommonAttributes(doc)
assertOutputAttribute(doc, true)
reset(runner)
// non-default attribute name and fetch without type
runner.setProperty(GetElasticsearch.ATTRIBUTE_NAME, "my_attr")
runner.removeProperty(GetElasticsearch.TYPE)
runner.setIncomingConnection(false)
runner.run()
testCounts(runner, 1, 0, 0, 0)
doc = runner.getFlowFilesForRelationship(GetElasticsearch.REL_DOC).get(0)
assertThat(doc.getContent(), is(""))
assertCommonAttributes(doc, false)
assertOutputAttribute(doc, true, "my_attr")
}
@Test
void testErrorDuringFetch() throws Exception {
final TestRunner runner = createRunner()
getService(runner).setThrowErrorInGet(true)
runner.setIncomingConnection(true)
runProcessor(runner)
testCounts(runner, 0, 1, 0, 0)
reset(runner)
runner.setIncomingConnection(false)
runner.run()
testCounts(runner, 0, 0, 0, 0)
}
@Test
void testNotFound() throws Exception {
final TestRunner runner = createRunner()
getService(runner).setThrowNotFoundInGet(true)
runProcessor(runner)
testCounts(runner, 0, 0, 0, 1)
reset(runner)
}
@Test
void testRequestParameters() {
final TestRunner runner = createRunner()
runner.setProperty("refresh", "true")
runner.setProperty("_source", '${source}')
runner.setVariable("source", "msg")
runProcessor(runner)
final TestElasticsearchClientService service = getService(runner)
Assert.assertEquals(2, service.getRequestParameters().size())
Assert.assertEquals("true", service.getRequestParameters().get("refresh"))
Assert.assertEquals("msg", service.getRequestParameters().get("_source"))
}
private static void testCounts(final TestRunner runner, final int doc, final int failure, final int retry, final int notFound) {
runner.assertTransferCount(GetElasticsearch.REL_DOC, doc)
runner.assertTransferCount(GetElasticsearch.REL_FAILURE, failure)
runner.assertTransferCount(GetElasticsearch.REL_RETRY, retry)
runner.assertTransferCount(GetElasticsearch.REL_NOT_FOUND, notFound)
}
private static void assertOutputContent(final String content) {
assertThat(content, is(toJson(["msg": "one"])))
}
private static void assertOutputAttribute(final MockFlowFile doc, final boolean attributeOutput = false, final String attr = "elasticsearch.doc") {
if (attributeOutput) {
doc.assertAttributeEquals(attr, toJson(["msg": "one"]))
} else {
doc.assertAttributeNotExists(attr)
}
}
private static void assertCommonAttributes(final MockFlowFile doc, final boolean type = true, final boolean error = false) {
doc.assertAttributeEquals("filename", "doc_1")
doc.assertAttributeEquals("elasticsearch.index", INDEX_NAME)
if (type) {
doc.assertAttributeEquals("elasticsearch.type", "message")
} else {
doc.assertAttributeNotExists("elasticsearch.type")
}
if (error) {
doc.assertAttributeEquals("elasticsearch.get.error", "message")
} else {
doc.assertAttributeNotExists("elasticsearch.get.error")
}
}
private static TestRunner createRunner() {
final GetElasticsearch processor = new GetElasticsearch()
final TestRunner runner = TestRunners.newTestRunner(processor)
final TestElasticsearchClientService service = new TestElasticsearchClientService(false)
runner.addControllerService("esService", service)
runner.enableControllerService(service)
runner.setProperty(GetElasticsearch.CLIENT_SERVICE, "esService")
runner.setProperty(GetElasticsearch.INDEX, INDEX_NAME)
runner.setProperty(GetElasticsearch.TYPE, "message")
runner.setProperty(GetElasticsearch.ID, "doc_1")
runner.setProperty(GetElasticsearch.DESTINATION, GetElasticsearch.FLOWFILE_CONTENT)
runner.setProperty(GetElasticsearch.ATTRIBUTE_NAME, "elasticsearch.doc")
runner.setValidateExpressionUsage(true)
return runner
}
private static MockFlowFile runProcessor(final TestRunner runner) {
final MockFlowFile ff = runner.enqueue("test")
runner.run()
return ff
}
private static TestElasticsearchClientService getService(final TestRunner runner) {
return runner.getControllerService("esService", TestElasticsearchClientService.class)
}
private static void reset(final TestRunner runner) {
runner.clearProvenanceEvents()
runner.clearTransferState()
}
}

View File

@ -64,7 +64,7 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
testCounts(runner, 0, 0, 0, 0)
assertThat(runner.getLogger().getErrorMessages().stream()
.anyMatch({ logMessage ->
logMessage.getMsg().contains("Error processing flowfile") &&
logMessage.getMsg().contains("Could not query documents") &&
logMessage.getThrowable().getMessage() == "Simulated IOException - scroll"
}),
is(true)

View File

@ -21,14 +21,20 @@ import groovy.json.JsonSlurper
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.elasticsearch.DeleteOperationResponse
import org.apache.nifi.elasticsearch.ElasticSearchClientService
import org.apache.nifi.elasticsearch.ElasticsearchException
import org.apache.nifi.elasticsearch.IndexOperationRequest
import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.elasticsearch.SearchResponse
import org.apache.nifi.elasticsearch.UpdateOperationResponse
import org.apache.nifi.processors.elasticsearch.mock.MockElasticsearchException
import org.elasticsearch.client.Response
import org.elasticsearch.client.ResponseException
class TestElasticsearchClientService extends AbstractControllerService implements ElasticSearchClientService {
private boolean returnAggs
private boolean throwErrorInSearch
private boolean throwErrorInGet
private boolean throwNotFoundInGet
private boolean throwErrorInDelete
private boolean throwErrorInPit
private boolean throwErrorInUpdate
@ -43,7 +49,11 @@ class TestElasticsearchClientService extends AbstractControllerService implement
private void common(boolean throwError, Map<String, String> requestParameters) {
if (throwError) {
throw new IOException("Simulated IOException")
if (throwNotFoundInGet) {
throw new MockElasticsearchException(false, true)
} else {
throw new IOException("Simulated IOException")
}
}
this.requestParameters = requestParameters
}
@ -89,7 +99,7 @@ class TestElasticsearchClientService extends AbstractControllerService implement
@Override
Map<String, Object> get(String index, String type, String id, Map<String, String> requestParameters) {
common(false, requestParameters)
common(throwErrorInGet || throwNotFoundInGet, requestParameters)
return [ "msg": "one" ]
}
@ -298,6 +308,14 @@ class TestElasticsearchClientService extends AbstractControllerService implement
" }\n" +
" ]"
void setThrowNotFoundInGet(boolean throwNotFoundInGet) {
this.throwNotFoundInGet = throwNotFoundInGet
}
void setThrowErrorInGet(boolean throwErrorInGet) {
this.throwErrorInGet = throwErrorInGet
}
void setThrowErrorInSearch(boolean throwErrorInSearch) {
this.throwErrorInSearch = throwErrorInSearch
}

View File

@ -29,9 +29,9 @@ class MockBulkLoadClientService extends AbstractMockElasticsearchClient {
@Override
IndexOperationResponse bulk(List<IndexOperationRequest> items, Map<String, String> requestParameters) {
if (throwRetriableError) {
throw new MockElasticsearchError(true)
throw new MockElasticsearchException(true, false)
} else if (throwFatalError) {
throw new MockElasticsearchError(false)
throw new MockElasticsearchException(false, false)
}
if (evalClosure) {

View File

@ -17,15 +17,16 @@
package org.apache.nifi.processors.elasticsearch.mock
import org.apache.nifi.elasticsearch.ElasticsearchError
import org.apache.nifi.elasticsearch.ElasticsearchException
class MockElasticsearchError extends ElasticsearchError {
MockElasticsearchError(boolean isElastic) {
class MockElasticsearchException extends ElasticsearchException {
MockElasticsearchException(boolean elastic, boolean notFound) {
this(new Exception())
this.isElastic = isElastic
this.elastic = elastic
this.notFound = notFound
}
MockElasticsearchError(Exception ex) {
MockElasticsearchException(Exception ex) {
super(ex)
}
}