From 00412f6e97fdc077cf9c8c0d365ebe4876a33305 Mon Sep 17 00:00:00 2001 From: Joe Gresock Date: Sun, 28 Aug 2016 10:09:32 +0000 Subject: [PATCH] NIFI-2417: Adding Query and Scroll processors for Elasticsearch Signed-off-by: Matt Burgess --- .../nifi-elasticsearch-processors/pom.xml | 2 +- .../elasticsearch/QueryElasticsearchHttp.java | 410 ++++++++++++++++ .../elasticsearch/RetryableException.java | 42 ++ .../ScrollElasticsearchHttp.java | 415 ++++++++++++++++ .../elasticsearch/UnretryableException.java | 43 ++ .../org.apache.nifi.processor.Processor | 2 + .../ITQueryElasticsearchHttp.java | 94 ++++ .../ITScrollElasticsearchHttp.java | 64 +++ .../TestQueryElasticsearchHttp.java | 443 ++++++++++++++++++ .../TestScrollElasticsearchHttp.java | 398 ++++++++++++++++ .../src/test/resources/query-page1.json | 57 +++ .../src/test/resources/query-page2.json | 36 ++ .../src/test/resources/query-page3.json | 14 + .../src/test/resources/scroll-page1.json | 56 +++ .../src/test/resources/scroll-page2.json | 36 ++ .../src/test/resources/scroll-page3.json | 15 + 16 files changed, 2126 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/query-page1.json create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/query-page2.json create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/query-page3.json create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/scroll-page1.json create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/scroll-page2.json create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/scroll-page3.json diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml index 4766815a43..bd47b7d46b 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml @@ -92,7 +92,7 @@ language governing permissions and limitations under the License. --> apache-rat-plugin - src/test/resources/DocumentExample.json + src/test/resources/*.json diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java new file mode 100644 index 0000000000..ba3969faaa --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.codehaus.jackson.JsonNode; + +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Response; +import okhttp3.ResponseBody; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({ "elasticsearch", "query", "read", "get", "http" }) +@CapabilityDescription("Queries Elasticsearch using the specified connection properties. " + + "Note that the full body of each page of documents will be read into memory before being " + + "written to Flow Files for transfer. Also note that the Elasticsearch max_result_window index " + + "setting is the upper bound on the number of records that can be retrieved using this query. " + + "To retrieve more records, use the ScrollElasticsearchHttp processor.") +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"), + @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), + @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"), + @WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of " + + "each result will be placed into corresponding attributes with this prefix.") }) +public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + + private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; + private static final String QUERY_QUERY_PARAM = "q"; + private static final String SORT_QUERY_PARAM = "sort"; + private static final String FROM_QUERY_PARAM = "from"; + private static final String SIZE_QUERY_PARAM = "size"; + + public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content"; + public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes"; + private static final String ATTRIBUTE_PREFIX = "es.result."; + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description( + "All FlowFiles that are read from Elasticsearch are routed to this relationship.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description( + "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming " + + "flow files will be routed to failure.").build(); + + public static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description( + "A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may " + + "succeed. Note that if the processor has no incoming connections, flow files may still be sent to this relationship " + + "based on the processor properties and the results of the fetch operation.") + .build(); + + public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("query-es-query").displayName("Query") + .description("The Lucene-style query to run against ElasticSearch").required(true) + .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("query-es-index").displayName("Index") + .description("The name of the index to read from").required(true) + .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() + .name("query-es-type") + .displayName("Type") + .description( + "The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set " + + "to _all, the first document matching the identifier across all types will be retrieved.") + .required(false).expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder() + .name("query-es-fields") + .displayName("Fields") + .description( + "A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, " + + "then the entire document's source will be retrieved.") + .required(false).expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder() + .name("query-es-sort") + .displayName("Sort") + .description( + "A sort parameter (e.g., timestamp:asc). If the Sort property is left blank, " + + "then the results will be retrieved in document order.") + .required(false).expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder() + .name("query-es-size").displayName("Page Size").defaultValue("20") + .description("Determines how many documents to return per page during scrolling.") + .required(true).expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build(); + + public static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() + .name("query-es-limit").displayName("Limit") + .description("If set, limits the number of results that will be returned.") + .required(false).expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build(); + + public static final PropertyDescriptor TARGET = new PropertyDescriptor.Builder() + .name("query-es-target") + .displayName("Target") + .description( + "Indicates where the results should be placed. In the case of 'Flow file content', the JSON " + + "response will be written as the content of the flow file. In the case of 'Flow file attributes', " + + "the original flow file (if applicable) will be cloned for each result, and all return fields will be placed " + + "in a flow file attribute of the same name, but prefixed by 'es.result.'") + .required(true).expressionLanguageSupported(false) + .defaultValue(TARGET_FLOW_FILE_CONTENT) + .allowableValues(TARGET_FLOW_FILE_CONTENT, TARGET_FLOW_FILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_RETRY); + return Collections.unmodifiableSet(relationships); + } + + @Override + public final List getSupportedPropertyDescriptors() { + final List descriptors = new ArrayList<>(); + descriptors.add(ES_URL); + descriptors.add(PROP_SSL_CONTEXT_SERVICE); + descriptors.add(USERNAME); + descriptors.add(PASSWORD); + descriptors.add(CONNECT_TIMEOUT); + descriptors.add(RESPONSE_TIMEOUT); + descriptors.add(QUERY); + descriptors.add(PAGE_SIZE); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(FIELDS); + descriptors.add(SORT); + descriptors.add(LIMIT); + descriptors.add(TARGET); + + return Collections.unmodifiableList(descriptors); + } + + @OnScheduled + public void setup(ProcessContext context) { + super.setup(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) + throws ProcessException { + + FlowFile flowFile = null; + if (context.hasIncomingConnection()) { + flowFile = session.get(); + + // If we have no FlowFile, and all incoming connections are self-loops then we can + // continue on. + // However, if we have no FlowFile and we have connections coming from other Processors, + // then + // we know that we should run only if we have a FlowFile. + if (flowFile == null && context.hasNonLoopConnection()) { + return; + } + } + + OkHttpClient okHttpClient = getClient(); + + final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile) + .getValue(); + final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile) + .getValue(); + final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile) + .getValue(); + final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile) + .asInteger().intValue(); + final Integer limit = context.getProperty(LIMIT).isSet() ? context.getProperty(LIMIT) + .evaluateAttributeExpressions(flowFile).asInteger().intValue() : null; + final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS) + .evaluateAttributeExpressions(flowFile).getValue() : null; + final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT) + .evaluateAttributeExpressions(flowFile).getValue() : null; + final boolean targetIsContent = context.getProperty(TARGET).getValue() + .equals(TARGET_FLOW_FILE_CONTENT); + + // Authentication + final String username = context.getProperty(USERNAME).getValue(); + final String password = context.getProperty(PASSWORD).getValue(); + + final ComponentLog logger = getLogger(); + + int fromIndex = 0; + int numResults; + + try { + logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index, docType, + query }); + + final long startNanos = System.nanoTime(); + // read the url property from the context + final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue()); + + boolean hitLimit = false; + do { + int mPageSize = pageSize; + if (limit != null && limit <= (fromIndex + pageSize)) { + mPageSize = limit - fromIndex; + hitLimit = true; + } + + final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort, + mPageSize, fromIndex); + + final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl, + username, password, "GET", null); + numResults = this.getPage(getResponse, queryUrl, context, session, flowFile, + logger, startNanos, targetIsContent); + fromIndex += pageSize; + } while (numResults > 0 && !hitLimit); + + if (flowFile != null) { + session.remove(flowFile); + } + } catch (IOException ioe) { + logger.error( + "Failed to read from Elasticsearch due to {}, this may indicate an error in configuration " + + "(hosts, username/password, etc.). Routing to retry", + new Object[] { ioe.getLocalizedMessage() }, ioe); + if (flowFile != null) { + session.transfer(flowFile, REL_RETRY); + } + context.yield(); + + } catch (RetryableException e) { + logger.error(e.getMessage(), new Object[] { e.getLocalizedMessage() }, e); + if (flowFile != null) { + session.transfer(flowFile, REL_RETRY); + } + context.yield(); + } catch (Exception e) { + logger.error("Failed to read {} from Elasticsearch due to {}", new Object[] { flowFile, + e.getLocalizedMessage() }, e); + if (flowFile != null) { + session.transfer(flowFile, REL_FAILURE); + } + context.yield(); + } + } + + private int getPage(final Response getResponse, final URL url, final ProcessContext context, + final ProcessSession session, FlowFile flowFile, final ComponentLog logger, + final long startNanos, boolean targetIsContent) + throws IOException { + List page = new ArrayList<>(); + final int statusCode = getResponse.code(); + + if (isSuccess(statusCode)) { + ResponseBody body = getResponse.body(); + final byte[] bodyBytes = body.bytes(); + JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes)); + JsonNode hits = responseJson.get("hits").get("hits"); + + for(int i = 0; i < hits.size(); i++) { + JsonNode hit = hits.get(i); + String retrievedId = hit.get("_id").asText(); + String retrievedIndex = hit.get("_index").asText(); + String retrievedType = hit.get("_type").asText(); + + FlowFile documentFlowFile = null; + if (flowFile != null) { + documentFlowFile = targetIsContent ? session.create(flowFile) : session.clone(flowFile); + } else { + documentFlowFile = session.create(); + } + + JsonNode source = hit.get("_source"); + documentFlowFile = session.putAttribute(documentFlowFile, "es.index", retrievedIndex); + documentFlowFile = session.putAttribute(documentFlowFile, "es.type", retrievedType); + + if (targetIsContent) { + documentFlowFile = session.putAttribute(documentFlowFile, "filename", retrievedId); + documentFlowFile = session.write(documentFlowFile, out -> { + out.write(source.toString().getBytes()); + }); + } else { + Map attributes = new HashMap<>(); + for(Iterator> it = source.getFields(); it.hasNext(); ) { + Entry entry = it.next(); + attributes.put(ATTRIBUTE_PREFIX + entry.getKey(), entry.getValue().asText()); + } + documentFlowFile = session.putAllAttributes(documentFlowFile, attributes); + } + page.add(documentFlowFile); + } + logger.debug("Elasticsearch retrieved " + responseJson.size() + " documents, routing to success"); + + session.transfer(page, REL_SUCCESS); + } else { + try { + // 5xx -> RETRY, but a server error might last a while, so yield + if (statusCode / 100 == 5) { + throw new RetryableException(String.format("Elasticsearch returned code %s with message %s, transferring flow file to retry. This is likely a server problem, yielding...", + statusCode, getResponse.message())); + } else if (context.hasIncomingConnection()) { // 1xx, 3xx, 4xx -> NO RETRY + throw new UnretryableException(String.format("Elasticsearch returned code %s with message %s, transferring flow file to failure", + statusCode, getResponse.message())); + } else { + logger.warn("Elasticsearch returned code {} with message {}", new Object[]{statusCode, getResponse.message()}); + } + } finally { + if (!page.isEmpty()) { + session.remove(page); + page.clear(); + } + } + } + + // emit provenance event + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + if (!page.isEmpty()) { + if (context.hasNonLoopConnection()) { + page.forEach(f -> session.getProvenanceReporter().fetch(f, url.toExternalForm(), millis)); + } else { + page.forEach(f -> session.getProvenanceReporter().receive(f, url.toExternalForm(), millis)); + } + } + return page.size(); + } + + private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields, + String sort, int pageSize, int fromIndex) throws MalformedURLException { + if (StringUtils.isEmpty(baseUrl)) { + throw new MalformedURLException("Base URL cannot be null"); + } + HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder(); + builder.addPathSegment(index); + builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type); + builder.addPathSegment("_search"); + builder.addQueryParameter(QUERY_QUERY_PARAM, query); + builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize)); + builder.addQueryParameter(FROM_QUERY_PARAM, String.valueOf(fromIndex)); + if (!StringUtils.isEmpty(fields)) { + String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(",")); + builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields); + } + if (!StringUtils.isEmpty(sort)) { + String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(",")); + builder.addQueryParameter(SORT_QUERY_PARAM, trimmedFields); + } + + return builder.build().url(); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java new file mode 100644 index 0000000000..8e9414532e --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/RetryableException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +/** + * Represents a retryable exception from ElasticSearch. + */ +public class RetryableException extends RuntimeException { + + private static final long serialVersionUID = -2755015600102381620L; + + public RetryableException() { + super(); + } + + public RetryableException(String message, Throwable cause) { + super(message, cause); + } + + public RetryableException(String message) { + super(message); + } + + public RetryableException(Throwable cause) { + super(cause); + } + +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java new file mode 100644 index 0000000000..accdca9f2c --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.codehaus.jackson.JsonNode; + +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Response; +import okhttp3.ResponseBody; + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@EventDriven +@SupportsBatching +@Tags({ "elasticsearch", "query", "scroll", "read", "get", "http" }) +@CapabilityDescription("Scrolls through an Elasticsearch query using the specified connection properties. " + + "This processor is intended to be run on the primary node, and is designed for scrolling through " + + "huge result sets, as in the case of a reindex. The state must be cleared before another query " + + "can be run. Each page of results is returned, wrapped in a JSON object like so: { \"hits\" : [ , , ] }. " + + "Note that the full body of each page of documents will be read into memory before being " + + "written to a Flow File for transfer.") +@WritesAttributes({ + @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), + @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") }) +@Stateful(description = "After each successful scroll page, the latest scroll_id is persisted in scrollId as input for the next scroll call. " + + "Once the entire query is complete, finishedQuery state will be set to true, and the processor will not execute unless this is cleared.", scopes = { Scope.LOCAL }) +public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + + private static final String FINISHED_QUERY_STATE = "finishedQuery"; + private static final String SCROLL_ID_STATE = "scrollId"; + private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; + private static final String QUERY_QUERY_PARAM = "q"; + private static final String SORT_QUERY_PARAM = "sort"; + private static final String SCROLL_QUERY_PARAM = "scroll"; + private static final String SCROLL_ID_QUERY_PARAM = "scroll_id"; + private static final String SIZE_QUERY_PARAM = "size"; + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description( + "All FlowFiles that are read from Elasticsearch are routed to this relationship.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description( + "All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming " + + "flow files will be routed to failure.").build(); + + public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("scroll-es-query").displayName("Query") + .description("The Lucene-style query to run against ElasticSearch").required(true) + .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor SCROLL_DURATION = new PropertyDescriptor.Builder() + .name("scroll-es-scroll") + .displayName("Scroll Duration") + .description("The scroll duration is how long each search context is kept in memory.") + .defaultValue("1m") + .required(true) + .expressionLanguageSupported(true) + .addValidator( + StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9]+(m|h)"))) + .build(); + + public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("scroll-es-index").displayName("Index") + .description("The name of the index to read from").required(true) + .expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() + .name("scroll-es-type") + .displayName("Type") + .description( + "The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set " + + "to _all, the first document matching the identifier across all types will be retrieved.") + .required(false).expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder() + .name("scroll-es-fields") + .displayName("Fields") + .description( + "A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, " + + "then the entire document's source will be retrieved.") + .required(false).expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder() + .name("scroll-es-sort") + .displayName("Sort") + .description( + "A sort parameter (e.g., timestamp:asc). If the Sort property is left blank, " + + "then the results will be retrieved in document order.") + .required(false).expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder() + .name("scroll-es-size").displayName("Page Size").defaultValue("20") + .description("Determines how many documents to return per page during scrolling.") + .required(true).expressionLanguageSupported(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build(); + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + return Collections.unmodifiableSet(relationships); + } + + @Override + public final List getSupportedPropertyDescriptors() { + final List descriptors = new ArrayList<>(); + descriptors.add(ES_URL); + descriptors.add(PROP_SSL_CONTEXT_SERVICE); + descriptors.add(USERNAME); + descriptors.add(PASSWORD); + descriptors.add(CONNECT_TIMEOUT); + descriptors.add(RESPONSE_TIMEOUT); + descriptors.add(QUERY); + descriptors.add(SCROLL_DURATION); + descriptors.add(PAGE_SIZE); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(FIELDS); + descriptors.add(SORT); + + return Collections.unmodifiableList(descriptors); + } + + @OnScheduled + public void setup(ProcessContext context) { + super.setup(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) + throws ProcessException { + + try { + if (isQueryFinished(context.getStateManager())) { + getLogger().trace( + "Query has been marked finished in the state manager. " + + "To run another query, clear the state."); + return; + } + } catch (IOException e) { + throw new ProcessException("Could not retrieve state", e); + } + + OkHttpClient okHttpClient = getClient(); + + FlowFile flowFile = session.create(); + + final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile) + .getValue(); + final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile) + .getValue(); + final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile) + .getValue(); + final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile) + .asInteger().intValue(); + final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS) + .evaluateAttributeExpressions(flowFile).getValue() : null; + final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT) + .evaluateAttributeExpressions(flowFile).getValue() : null; + final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? context + .getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null; + + // Authentication + final String username = context.getProperty(USERNAME).getValue(); + final String password = context.getProperty(PASSWORD).getValue(); + + final ComponentLog logger = getLogger(); + + try { + String scrollId = loadScrollId(context.getStateManager()); + + if (scrollId != null) { + // read the url property from the context + final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL) + .getValue()); + final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort, + scrollId, pageSize, scroll); + final long startNanos = System.nanoTime(); + + final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl, + username, password, "GET", null); + this.getPage(getResponse, scrollurl, context, session, flowFile, logger, startNanos); + } else { + logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index, + docType, query }); + + // read the url property from the context + final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL) + .getValue()); + final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort, + scrollId, pageSize, scroll); + final long startNanos = System.nanoTime(); + + final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl, + username, password, "GET", null); + this.getPage(getResponse, queryUrl, context, session, flowFile, logger, startNanos); + } + + } catch (IOException ioe) { + logger.error( + "Failed to read from Elasticsearch due to {}, this may indicate an error in configuration " + + "(hosts, username/password, etc.).", + new Object[] { ioe.getLocalizedMessage() }, ioe); + session.remove(flowFile); + context.yield(); + + } catch (Exception e) { + logger.error("Failed to read {} from Elasticsearch due to {}", new Object[] { flowFile, + e.getLocalizedMessage() }, e); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + } + } + + private void getPage(final Response getResponse, final URL url, final ProcessContext context, + final ProcessSession session, FlowFile flowFile, final ComponentLog logger, final long startNanos) + throws IOException { + final int statusCode = getResponse.code(); + + if (isSuccess(statusCode)) { + ResponseBody body = getResponse.body(); + final byte[] bodyBytes = body.bytes(); + JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes)); + String scrollId = responseJson.get("_scroll_id").asText(); + + StringBuilder builder = new StringBuilder(); + + builder.append("{ \"hits\" : ["); + + JsonNode hits = responseJson.get("hits").get("hits"); + if (hits.size() == 0) { + finishQuery(context.getStateManager()); + session.remove(flowFile); + return; + } + + for(int i = 0; i < hits.size(); i++) { + JsonNode hit = hits.get(i); + String retrievedIndex = hit.get("_index").asText(); + String retrievedType = hit.get("_type").asText(); + + JsonNode source = hit.get("_source"); + flowFile = session.putAttribute(flowFile, "es.index", retrievedIndex); + flowFile = session.putAttribute(flowFile, "es.type", retrievedType); + + builder.append(source.toString()); + if (i < hits.size() - 1) { + builder.append(", "); + } + } + builder.append("] }"); + logger.debug("Elasticsearch retrieved " + responseJson.size() + " documents, routing to success"); + + flowFile = session.write(flowFile, out -> { + out.write(builder.toString().getBytes()); + }); + session.transfer(flowFile, REL_SUCCESS); + + saveScrollId(context.getStateManager(), scrollId); + + // emit provenance event + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().receive(flowFile, url.toExternalForm(), millis); + } else { + // 5xx -> RETRY, but a server error might last a while, so yield + if (statusCode / 100 == 5) { + + logger.warn("Elasticsearch returned code {} with message {}, removing the flow file. This is likely a server problem, yielding...", + new Object[]{statusCode, getResponse.message()}); + session.remove(flowFile); + context.yield(); + } else { + logger.warn("Elasticsearch returned code {} with message {}", new Object[]{statusCode, getResponse.message()}); + session.remove(flowFile); + } + } + } + + private boolean isQueryFinished(StateManager stateManager) throws IOException { + final StateMap stateMap = stateManager.getState(Scope.LOCAL); + + if (stateMap.getVersion() < 0) { + getLogger().debug("No previous state found"); + return false; + } + + final String isQueryFinished = stateMap.get(FINISHED_QUERY_STATE); + getLogger().debug("Loaded state with finishedQuery = {}", new Object[] { isQueryFinished }); + + return "true".equals(isQueryFinished); + } + + private String loadScrollId(StateManager stateManager) throws IOException { + final StateMap stateMap = stateManager.getState(Scope.LOCAL); + + if (stateMap.getVersion() < 0) { + getLogger().debug("No previous state found"); + return null; + } + + final String scrollId = stateMap.get(SCROLL_ID_STATE); + getLogger().debug("Loaded state with scrollId {}", new Object[] { scrollId }); + + return scrollId; + } + + private void finishQuery(StateManager stateManager) throws IOException { + + Map state = new HashMap<>(2); + state.put(FINISHED_QUERY_STATE, "true"); + + getLogger().debug("Saving state with finishedQuery = true"); + stateManager.setState(state, Scope.LOCAL); + } + + private void saveScrollId(StateManager stateManager, String scrollId) throws IOException { + + Map state = new HashMap<>(2); + state.put(SCROLL_ID_STATE, scrollId); + + getLogger().debug("Saving state with scrollId of {}", new Object[] { scrollId }); + stateManager.setState(state, Scope.LOCAL); + } + + private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields, + String sort, String scrollId, int pageSize, String scroll) throws MalformedURLException { + if (StringUtils.isEmpty(baseUrl)) { + throw new MalformedURLException("Base URL cannot be null"); + } + HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder(); + if (!StringUtils.isEmpty(scrollId)) { + builder.addPathSegment("_search"); + builder.addPathSegment("scroll"); + builder.addQueryParameter(SCROLL_ID_QUERY_PARAM, scrollId); + } else { + builder.addPathSegment(index); + builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type); + builder.addPathSegment("_search"); + builder.addQueryParameter(QUERY_QUERY_PARAM, query); + builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize)); + if (!StringUtils.isEmpty(fields)) { + String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(",")); + builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields); + } + if (!StringUtils.isEmpty(sort)) { + String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(",")); + builder.addQueryParameter(SORT_QUERY_PARAM, trimmedFields); + } + } + builder.addQueryParameter(SCROLL_QUERY_PARAM, scroll); + + return builder.build().url(); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java new file mode 100644 index 0000000000..bae83cff57 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UnretryableException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +/** + * Represents an unrecoverable error from ElasticSearch. + * @author jgresock + * + */ +public class UnretryableException extends RuntimeException { + private static final long serialVersionUID = -4528006567211380914L; + + public UnretryableException() { + super(); + } + + public UnretryableException(String message, Throwable cause) { + super(message, cause); + } + + public UnretryableException(String message) { + super(message); + } + + public UnretryableException(Throwable cause) { + super(cause); + } + +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 782f87e89a..a6cd087d76 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -16,3 +16,5 @@ org.apache.nifi.processors.elasticsearch.FetchElasticsearch org.apache.nifi.processors.elasticsearch.PutElasticsearch org.apache.nifi.processors.elasticsearch.FetchElasticsearchHttp org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp +org.apache.nifi.processors.elasticsearch.QueryElasticsearchHttp +org.apache.nifi.processors.elasticsearch.ScrollElasticsearchHttp diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java new file mode 100644 index 0000000000..15f2707fb6 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Test; + +public class ITQueryElasticsearchHttp { + + private TestRunner runner; + + @After + public void teardown() { + runner = null; + } + + @Test + public void testFetchElasticsearchOnTrigger() throws IOException { + runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, + "http://localhost.internal:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "provenance"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, + "identifier:2f79eba8839f5976cd0b1e16a0e7fe8d7dd0ceca"); + runner.setProperty(QueryElasticsearchHttp.SORT, "timestamp:asc"); + runner.setProperty(QueryElasticsearchHttp.FIELDS, "transit_uri,version"); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "1"); + runner.assertValid(); + + runner.setIncomingConnection(false); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, 3); + final MockFlowFile out = runner.getFlowFilesForRelationship( + QueryElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + } + + @Test + public void testFetchElasticsearchOnTrigger_IncomingFile() throws IOException { + runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, + "http://localhost.internal:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "provenance"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, "${query}"); + runner.setProperty(QueryElasticsearchHttp.SORT, "timestamp:asc"); + runner.setProperty(QueryElasticsearchHttp.FIELDS, "transit_uri,version"); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "1"); + runner.assertValid(); + + Map attributes = new HashMap<>(); + attributes.put("query", "identifier:2f79eba8839f5976cd0b1e16a0e7fe8d7dd0ceca"); + runner.enqueue("".getBytes(), attributes); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, 3); + final MockFlowFile out = runner.getFlowFilesForRelationship( + QueryElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java new file mode 100644 index 0000000000..aa2a1e0c75 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Test; + +public class ITScrollElasticsearchHttp { + + private TestRunner runner; + + @After + public void teardown() { + runner = null; + } + + @Test + public void testFetchElasticsearchOnTrigger() throws IOException { + runner = TestRunners.newTestRunner(ScrollElasticsearchHttp.class); // all docs are found + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, + "http://ip-172-31-49-152.ec2.internal:9200"); + + runner.setProperty(ScrollElasticsearchHttp.INDEX, "prod-accounting"); + runner.assertNotValid(); + runner.setProperty(ScrollElasticsearchHttp.TYPE, "provenance"); + runner.assertNotValid(); + runner.setProperty(ScrollElasticsearchHttp.QUERY, + "identifier:2f79eba8839f5976cd0b1e16a0e7fe8d7dd0ceca"); + runner.setProperty(ScrollElasticsearchHttp.SORT, "timestamp:asc"); + runner.setProperty(ScrollElasticsearchHttp.FIELDS, "transit_uri,version"); + runner.setProperty(ScrollElasticsearchHttp.PAGE_SIZE, "1"); + runner.assertValid(); + + runner.setIncomingConnection(false); + runner.run(4, true, true); + + runner.assertAllFlowFilesTransferred(ScrollElasticsearchHttp.REL_SUCCESS, 3); + final MockFlowFile out = runner.getFlowFilesForRelationship( + ScrollElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java new file mode 100644 index 0000000000..b9ec1f98af --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mockito.stubbing.OngoingStubbing; + +import okhttp3.Call; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; + +public class TestQueryElasticsearchHttp { + + private TestRunner runner; + + @After + public void teardown() { + runner = null; + } + + @Test + public void testQueryElasticsearchOnTrigger_withInput() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, + "source:Twitter AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + + runAndVerifySuccess(true); + } + + @Test + public void testQueryElasticsearchOnTrigger_withInput_attributeTarget() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, + "source:Twitter AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.TARGET, + QueryElasticsearchHttp.TARGET_FLOW_FILE_ATTRIBUTES); + + runAndVerifySuccess(false); + final MockFlowFile out = runner.getFlowFilesForRelationship( + QueryElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + assertEquals("blah", new String(out.toByteArray())); + assertEquals("Twitter", out.getAttribute("es.result.source")); + } + + @Test + public void testQueryElasticsearchOnTrigger_withNoInput() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, + "source:Twitter AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + + runner.setIncomingConnection(false); + runAndVerifySuccess(true); + } + + private void runAndVerifySuccess(int expectedResults, boolean targetIsContent) { + runner.enqueue("blah".getBytes(), new HashMap() { + { + put("identifier", "28039652140"); + } + }); + + // Running once should page through all 3 docs + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, expectedResults); + final MockFlowFile out = runner.getFlowFilesForRelationship( + QueryElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + if (targetIsContent) { + out.assertAttributeEquals("filename", "abc-97b-ASVsZu_" + + "vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3"); + } + } + + // By default, 3 files should go to Success + private void runAndVerifySuccess(boolean targetIsContent) { + runAndVerifySuccess(3, targetIsContent); + } + + @Test + public void testQueryElasticsearchOnTriggerWithFields() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.FIELDS, "id,, userinfo.location"); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.SORT, "timestamp:asc,identifier:desc"); + runner.assertValid(); + + runAndVerifySuccess(true); + } + + @Test + public void testQueryElasticsearchOnTriggerWithLimit() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.FIELDS, "id,, userinfo.location"); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.SORT, "timestamp:asc,identifier:desc"); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.LIMIT, "2"); + + runAndVerifySuccess(2, true); + } + + @Test + public void testQueryElasticsearchOnTriggerWithServerErrorRetry() throws IOException { + QueryElasticsearchHttpTestProcessor processor = new QueryElasticsearchHttpTestProcessor(); + processor.setStatus(500, "Server error"); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); + + runner.enqueue("".getBytes(), new HashMap() { + { + put("identifier", "28039652140"); + } + }); + + runner.run(1, true, true); + + // This test generates a HTTP 500 "Server error" + runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_RETRY, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship( + QueryElasticsearchHttp.REL_RETRY).get(0); + assertNotNull(out); + } + + @Test + public void testQueryElasticsearchOnTriggerWithServerFail() throws IOException { + QueryElasticsearchHttpTestProcessor processor = new QueryElasticsearchHttpTestProcessor(); + processor.setStatus(100, "Should fail"); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); + + runner.enqueue("".getBytes(), new HashMap() { + { + put("identifier", "28039652140"); + } + }); + + runner.run(1, true, true); + + // This test generates a HTTP 100 "Should fail" + runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship( + QueryElasticsearchHttp.REL_FAILURE).get(0); + assertNotNull(out); + } + + @Test + public void testQueryElasticsearchOnTriggerWithIOException() throws IOException { + QueryElasticsearchHttpTestProcessor processor = new QueryElasticsearchHttpTestProcessor(); + processor.setExceptionToThrow(new IOException("Error reading from disk")); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); + + runner.enqueue("".getBytes(), new HashMap() { + { + put("identifier", "28039652140"); + } + }); + + runner.run(1, true, true); + + // This test generates a HTTP 100 "Should fail" + runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_RETRY, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship( + QueryElasticsearchHttp.REL_RETRY).get(0); + assertNotNull(out); + } + + @Test + public void testQueryElasticsearchOnTriggerWithServerFailAfterSuccess() throws IOException { + QueryElasticsearchHttpTestProcessor processor = new QueryElasticsearchHttpTestProcessor(); + processor.setStatus(100, "Should fail", 2); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); + + runner.enqueue("".getBytes(), new HashMap() { + { + put("identifier", "28039652140"); + } + }); + + runner.run(1, true, true); + + // This test generates a HTTP 100 "Should fail" + runner.assertTransferCount(QueryElasticsearchHttp.REL_SUCCESS, 2); + runner.assertTransferCount(QueryElasticsearchHttp.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship( + QueryElasticsearchHttp.REL_FAILURE).get(0); + assertNotNull(out); + } + + @Test + public void testQueryElasticsearchOnTriggerWithServerFailNoIncomingFlowFile() throws IOException { + QueryElasticsearchHttpTestProcessor processor = new QueryElasticsearchHttpTestProcessor(); + processor.setStatus(100, "Should fail", 1); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); + + runner.setIncomingConnection(false); + runner.run(1, true, true); + + // This test generates a HTTP 100 with no incoming flow file, so nothing should be transferred + processor.getRelationships().forEach(relationship -> runner.assertTransferCount(relationship, 0)); + runner.assertTransferCount(QueryElasticsearchHttp.REL_FAILURE, 0); + } + + @Test + public void testSetupSecureClient() throws Exception { + QueryElasticsearchHttpTestProcessor processor = new QueryElasticsearchHttpTestProcessor(); + runner = TestRunners.newTestRunner(processor); + SSLContextService sslService = mock(SSLContextService.class); + when(sslService.getIdentifier()).thenReturn("ssl-context"); + runner.addControllerService("ssl-context", sslService); + runner.enableControllerService(sslService); + runner.setProperty(QueryElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.setValidateExpressionUsage(true); + runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); + + // Allow time for the controller service to fully initialize + Thread.sleep(500); + + runner.enqueue("".getBytes(), new HashMap() { + { + put("doc_id", "28039652140"); + } + }); + runner.run(1, true, true); + } + + /** + * A Test class that extends the processor in order to inject/mock behavior + */ + private static class QueryElasticsearchHttpTestProcessor extends QueryElasticsearchHttp { + Exception exceptionToThrow = null; + OkHttpClient client; + int goodStatusCode = 200; + String goodStatusMessage = "OK"; + + int badStatusCode; + String badStatusMessage; + int runNumber; + + List pages = Arrays.asList(getDoc("query-page1.json"), getDoc("query-page2.json"), + getDoc("query-page3.json")); + + public void setExceptionToThrow(Exception exceptionToThrow) { + this.exceptionToThrow = exceptionToThrow; + } + + /** + * Sets the status code and message for the 1st query + * + * @param code + * The status code to return + * @param message + * The status message + */ + void setStatus(int code, String message) { + this.setStatus(code, message, 1); + } + + /** + * Sets the status code and message for the runNumber-th query + * + * @param code + * The status code to return + * @param message + * The status message + * @param runNumber + * The run number for which to set this status + */ + void setStatus(int code, String message, int runNumber) { + badStatusCode = code; + badStatusMessage = message; + this.runNumber = runNumber; + } + + @Override + protected void createElasticsearchClient(ProcessContext context) throws ProcessException { + client = mock(OkHttpClient.class); + + OngoingStubbing stub = when(client.newCall(any(Request.class))); + + for (int i = 0; i < pages.size(); i++) { + String page = pages.get(i); + if (runNumber == i + 1) { + stub = mockReturnDocument(stub, page, badStatusCode, badStatusMessage); + } else { + stub = mockReturnDocument(stub, page, goodStatusCode, goodStatusMessage); + } + } + } + + private OngoingStubbing mockReturnDocument(OngoingStubbing stub, + final String document, int statusCode, String statusMessage) { + return stub.thenAnswer(new Answer() { + + @Override + public Call answer(InvocationOnMock invocationOnMock) throws Throwable { + Request realRequest = (Request) invocationOnMock.getArguments()[0]; + Response mockResponse = new Response.Builder() + .request(realRequest) + .protocol(Protocol.HTTP_1_1) + .code(statusCode) + .message(statusMessage) + .body(ResponseBody.create(MediaType.parse("application/json"), document)) + .build(); + final Call call = mock(Call.class); + if (exceptionToThrow != null) { + when(call.execute()).thenThrow(exceptionToThrow); + } else { + when(call.execute()).thenReturn(mockResponse); + } + return call; + } + }); + } + + protected OkHttpClient getClient() { + return client; + } + } + + private static String getDoc(String filename) { + try { + return IOUtils.toString(QueryElasticsearchHttp.class.getClassLoader() + .getResourceAsStream(filename)); + } catch (IOException e) { + System.out.println("Error reading document " + filename); + return ""; + } + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java new file mode 100644 index 0000000000..2616269257 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.mockito.stubbing.OngoingStubbing; + +import okhttp3.Call; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; + +public class TestScrollElasticsearchHttp { + + private TestRunner runner; + + @After + public void teardown() { + runner = null; + } + + @Test + public void testScrollElasticsearchOnTrigger_withNoInput() throws IOException { + runner = TestRunners.newTestRunner(new ScrollElasticsearchHttpTestProcessor()); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(ScrollElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(ScrollElasticsearchHttp.QUERY, + "source:WZ AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(ScrollElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + + runner.setIncomingConnection(false); + runAndVerifySuccess(); + } + + private void runAndVerifySuccess() { + runner.enqueue("".getBytes(), new HashMap() { + { + put("identifier", "28039652140"); + } + }); + + // Must run once for each of the 3 pages + runner.run(3, true, true); + + runner.assertAllFlowFilesTransferred(ScrollElasticsearchHttp.REL_SUCCESS, 2); + final MockFlowFile out = runner.getFlowFilesForRelationship( + ScrollElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + + int numHits = runner.getFlowFilesForRelationship( + ScrollElasticsearchHttp.REL_SUCCESS).stream().map(ff -> { + String page = new String(ff.toByteArray()); + return StringUtils.countMatches(page, "{\"timestamp\""); + }) + .reduce((a, b) -> a + b).get(); + Assert.assertEquals(3, numHits); + } + + @Test + public void testScrollElasticsearchOnTriggerWithFields() throws IOException { + runner = TestRunners.newTestRunner(new ScrollElasticsearchHttpTestProcessor()); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(ScrollElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}"); + runner.assertValid(); + runner.setProperty(ScrollElasticsearchHttp.FIELDS, "id,, userinfo.location"); + runner.assertValid(); + runner.setProperty(ScrollElasticsearchHttp.SORT, "timestamp:asc,identifier:desc"); + runner.assertValid(); + runner.setIncomingConnection(false); + + runAndVerifySuccess(); + } + + @Test + public void testScrollElasticsearchOnTriggerWithServerFail() throws IOException { + ScrollElasticsearchHttpTestProcessor processor = new ScrollElasticsearchHttpTestProcessor(); + processor.setStatus(100, "Should fail"); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); + runner.setProperty(ScrollElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}"); + runner.setIncomingConnection(false); + + runner.enqueue("".getBytes(), new HashMap() { + { + put("identifier", "28039652140"); + } + }); + + runner.run(1, true, true); + + // This test generates a HTTP 100 "Should fail" + runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 0); + runner.assertTransferCount(ScrollElasticsearchHttp.REL_SUCCESS, 0); + } + + @Test + public void testScrollElasticsearchOnTriggerWithServerRetry() throws IOException { + ScrollElasticsearchHttpTestProcessor processor = new ScrollElasticsearchHttpTestProcessor(); + processor.setStatus(500, "Internal error"); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); + runner.setProperty(ScrollElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}"); + runner.setIncomingConnection(false); + + runner.enqueue("".getBytes(), new HashMap() { + { + put("identifier", "28039652140"); + } + }); + + runner.run(1, true, true); + + // This test generates a HTTP 500 "Internal error" + runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 0); + runner.assertTransferCount(ScrollElasticsearchHttp.REL_SUCCESS, 0); + } + + @Test + public void testScrollElasticsearchOnTriggerWithServerFailAfterSuccess() throws IOException { + ScrollElasticsearchHttpTestProcessor processor = new ScrollElasticsearchHttpTestProcessor(); + processor.setStatus(100, "Should fail", 2); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); + runner.setProperty(ScrollElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}"); + + runner.enqueue("".getBytes(), new HashMap() { + { + put("identifier", "28039652140"); + } + }); + runner.setIncomingConnection(false); + + runner.run(1, true, true); + + // This test generates a HTTP 100 "Should fail" + runner.assertTransferCount(ScrollElasticsearchHttp.REL_SUCCESS, 1); + runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 0); + } + + @Test + public void testScrollElasticsearchOnTriggerWithServerFailNoIncomingFlowFile() throws IOException { + ScrollElasticsearchHttpTestProcessor processor = new ScrollElasticsearchHttpTestProcessor(); + processor.setStatus(100, "Should fail", 1); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); + runner.setProperty(ScrollElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}"); + + runner.setIncomingConnection(false); + runner.run(1, true, true); + + // This test generates a HTTP 100 with no incoming flow file, so nothing should be transferred + processor.getRelationships().forEach(relationship -> runner.assertTransferCount(relationship, 0)); + runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 0); + } + + @Test + public void testSetupSecureClient() throws Exception { + ScrollElasticsearchHttpTestProcessor processor = new ScrollElasticsearchHttpTestProcessor(); + runner = TestRunners.newTestRunner(processor); + SSLContextService sslService = mock(SSLContextService.class); + when(sslService.getIdentifier()).thenReturn("ssl-context"); + runner.addControllerService("ssl-context", sslService); + runner.enableControllerService(sslService); + runner.setProperty(ScrollElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); + runner.setValidateExpressionUsage(true); + runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}"); + runner.setIncomingConnection(false); + + // Allow time for the controller service to fully initialize + Thread.sleep(500); + + runner.enqueue("".getBytes(), new HashMap() { + { + put("doc_id", "28039652140"); + } + }); + runner.run(1, true, true); + + } + + @Test + public void testScrollElasticsearchOnTriggerWithIOException() throws IOException { + ScrollElasticsearchHttpTestProcessor processor = new ScrollElasticsearchHttpTestProcessor(); + processor.setExceptionToThrow(new IOException("Error reading from disk")); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); + runner.setProperty(ScrollElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}"); + + runner.enqueue("".getBytes(), new HashMap() { + { + put("identifier", "28039652140"); + } + }); + + runner.run(1, true, true); + + // This test generates a HTTP 100 "Should fail" + runner.assertTransferCount(ScrollElasticsearchHttp.REL_SUCCESS, 0); + runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 0); + } + + @Test + public void testScrollElasticsearchOnTriggerWithOtherException() throws IOException { + ScrollElasticsearchHttpTestProcessor processor = new ScrollElasticsearchHttpTestProcessor(); + processor.setExceptionToThrow(new IllegalArgumentException("Error reading from disk")); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); + runner.setProperty(ScrollElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}"); + + runner.enqueue("".getBytes(), new HashMap() { + { + put("identifier", "28039652140"); + } + }); + + runner.run(1, true, true); + + // This test generates a HTTP 100 "Should fail" + runner.assertTransferCount(ScrollElasticsearchHttp.REL_SUCCESS, 0); + runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 1); + } + + /** + * A Test class that extends the processor in order to inject/mock behavior + */ + private static class ScrollElasticsearchHttpTestProcessor extends ScrollElasticsearchHttp { + Exception exceptionToThrow = null; + OkHttpClient client; + int goodStatusCode = 200; + String goodStatusMessage = "OK"; + + int badStatusCode; + String badStatusMessage; + int runNumber; + + List pages = Arrays.asList(getDoc("scroll-page1.json"), + getDoc("scroll-page2.json"), getDoc("scroll-page3.json")); + + public void setExceptionToThrow(Exception exceptionToThrow) { + this.exceptionToThrow = exceptionToThrow; + } + + /** + * Sets the status code and message for the 1st query + * + * @param code + * The status code to return + * @param message + * The status message + */ + void setStatus(int code, String message) { + this.setStatus(code, message, 1); + } + + /** + * Sets the status code and message for the runNumber-th query + * + * @param code + * The status code to return + * @param message + * The status message + * @param runNumber + * The run number for which to set this status + */ + void setStatus(int code, String message, int runNumber) { + badStatusCode = code; + badStatusMessage = message; + this.runNumber = runNumber; + } + + @Override + protected void createElasticsearchClient(ProcessContext context) throws ProcessException { + client = mock(OkHttpClient.class); + + OngoingStubbing stub = when(client.newCall(any(Request.class))); + + for (int i = 0; i < pages.size(); i++) { + String page = pages.get(i); + if (runNumber == i + 1) { + stub = mockReturnDocument(stub, page, badStatusCode, badStatusMessage); + } else { + stub = mockReturnDocument(stub, page, goodStatusCode, goodStatusMessage); + } + } + } + + private OngoingStubbing mockReturnDocument(OngoingStubbing stub, + final String document, int statusCode, String statusMessage) { + return stub.thenAnswer(new Answer() { + + @Override + public Call answer(InvocationOnMock invocationOnMock) throws Throwable { + Request realRequest = (Request) invocationOnMock.getArguments()[0]; + Response mockResponse = new Response.Builder() + .request(realRequest) + .protocol(Protocol.HTTP_1_1) + .code(statusCode) + .message(statusMessage) + .body(ResponseBody.create(MediaType.parse("application/json"), document)) + .build(); + final Call call = mock(Call.class); + if (exceptionToThrow != null) { + when(call.execute()).thenThrow(exceptionToThrow); + } else { + when(call.execute()).thenReturn(mockResponse); + } + return call; + } + }); + } + + protected OkHttpClient getClient() { + return client; + } + } + + private static String getDoc(String filename) { + try { + return IOUtils.toString(ScrollElasticsearchHttp.class.getClassLoader() + .getResourceAsStream(filename)); + } catch (IOException e) { + System.out.println("Error reading document " + filename); + return ""; + } + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/query-page1.json b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/query-page1.json new file mode 100644 index 0000000000..50930761f8 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/query-page1.json @@ -0,0 +1,57 @@ +{ + + "took": 3, + "timed_out": false, + "_shards": { + "total": 5, + "successful": 5, + "failed": 0 + }, + "hits": { + "total": 3, + "max_score": null, + "hits": [ + { + "_index": "myindex", + "_type": "provenance", + "_id": "abc-97b-ASVsZu_vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3", + "_score": null, + "_source": { + "timestamp": "2016-07-22T14:47:08.102Z", + "event_type": "SEND", + "source": "Twitter", + "identifier": "abc-97b", + "transit_type": "S3", + "transit_uri": "file://cluster2/data/outgoing/S3/abc-97b.zip", + "object_type": "Provenance Record", + "version": "ASVsZu_vShwtGCJpGOObmuSqUJRUC3L_", + "file_size": "3645525" + }, + "sort": [ + 1469198828102 + ] + }, + { + "_index": "myindex", + "_type": "provenance", + "_id": "abc-a78-SjJkrwnv6edIRqJChEYzrE7PeT1hzioz-SEND-S3", + "_score": null, + "_source": { + "timestamp": "2016-07-22T14:47:08.101Z", + "event_type": "SEND", + "source": "Twitter", + "identifier": "abc-a78", + "transit_type": "S3", + "transit_uri": "file://cluster2/data/outgoing/S3/abc-a78.zip", + "object_type": "Provenance Record", + "version": "SjJkrwnv6edIRqJChEYzrE7PeT1hzioz", + "file_size": "4480294" + }, + "sort": [ + 1469198828101 + ] + + } + ] + } + } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/query-page2.json b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/query-page2.json new file mode 100644 index 0000000000..1ea27b33f4 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/query-page2.json @@ -0,0 +1,36 @@ +{ + + "took": 3, + "timed_out": false, + "_shards": { + "total": 5, + "successful": 5, + "failed": 0 + }, + "hits": { + "total": 3, + "max_score": null, + "hits": [ + { + "_index": "myindex", + "_type": "provenance", + "_id": "abc-42a-ArPsIlGBKqDvfL6qQZOVpmDwUEB.nynh-SEND-S3", + "_score": null, + "_source": { + "timestamp": "2016-07-22T14:47:08.101Z", + "event_type": "SEND", + "source": "Twitter", + "identifier": "abc-42a", + "transit_type": "S3", + "transit_uri": "file://cluster2/data/outgoing/S3/abc-42a.zip", + "object_type": "Provenance Record", + "version": "ArPsIlGBKqDvfL6qQZOVpmDwUEB.nynh", + "file_size": "18206872" + }, + "sort": [ + 1469198828101 + ] + } + ] + } + } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/query-page3.json b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/query-page3.json new file mode 100644 index 0000000000..a495930b01 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/query-page3.json @@ -0,0 +1,14 @@ +{ + "took": 6, + "timed_out": false, + "_shards": { + "total": 5, + "successful": 5, + "failed": 0 + }, + "hits": { + "total": 3, + "max_score": null, + "hits": [ ] + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/scroll-page1.json b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/scroll-page1.json new file mode 100644 index 0000000000..8d8c94063a --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/scroll-page1.json @@ -0,0 +1,56 @@ +{ + "_scroll_id": "cXVlcnlUaGVuRmV0Y2g7NTsyMDU3NjU6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3NjY6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njg6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njk6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njc6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzswOw==", + "took": 4, + "timed_out": false, + "_shards": { + "total": 5, + "successful": 5, + "failed": 0 + }, + "hits": { + "total": 3, + "max_score": null, + "hits": [ + { + "_index": "myindex", + "_type": "provenance", + "_id": "abc-97b-ASVsZu_vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3", + "_score": null, + "_source": { + "timestamp": "2016-07-22T14:47:08.102Z", + "event_type": "SEND", + "source": "Twitter", + "identifier": "abc-97b", + "transit_type": "S3", + "transit_uri": "file://cluster2/data/outgoing/S3/abc-97b.zip", + "object_type": "Provenance Record", + "version": "ASVsZu_vShwtGCJpGOObmuSqUJRUC3L_", + "file_size": "3645525" + }, + "sort": [ + 1469198828102 + ] + }, + { + "_index": "myindex", + "_type": "provenance", + "_id": "abc-a78-SjJkrwnv6edIRqJChEYzrE7PeT1hzioz-SEND-S3", + "_score": null, + "_source": { + "timestamp": "2016-07-22T14:47:08.101Z", + "event_type": "SEND", + "source": "Twitter", + "identifier": "abc-a78", + "transit_type": "S3", + "transit_uri": "file://cluster2/data/outgoing/S3/abc-a78.zip", + "object_type": "Provenance Record", + "version": "SjJkrwnv6edIRqJChEYzrE7PeT1hzioz", + "file_size": "4480294" + }, + "sort": [ + 1469198828101 + ] + } + ] + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/scroll-page2.json b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/scroll-page2.json new file mode 100644 index 0000000000..c02dfdc991 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/scroll-page2.json @@ -0,0 +1,36 @@ +{ + "_scroll_id": "dXVlcnlUaGVuRmV0Y2g7NTsyMDU3NjU6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3NjY6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njg6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njk6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njc6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzswOw==", + "took": 4, + "timed_out": false, + "_shards": { + "total": 5, + "successful": 5, + "failed": 0 + }, + "hits": { + "total": 3, + "max_score": null, + "hits": [ + { + "_index": "myindex", + "_type": "provenance", + "_id": "abc-97b-ASVsZu_vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3", + "_score": null, + "_source": { + "timestamp": "2016-07-22T14:47:08.102Z", + "event_type": "SEND", + "source": "Twitter", + "identifier": "abc-97b", + "transit_type": "S3", + "transit_uri": "file://cluster2/data/outgoing/S3/abc-97b.zip", + "object_type": "Provenance Record", + "version": "ASVsZu_vShwtGCJpGOObmuSqUJRUC3L_", + "file_size": "3645525" + }, + "sort": [ + 1469198828102 + ] + } + ] + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/scroll-page3.json b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/scroll-page3.json new file mode 100644 index 0000000000..90bd3ba20f --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/scroll-page3.json @@ -0,0 +1,15 @@ +{ + "_scroll_id": "eXVlcnlUaGVuRmV0Y2g7NTsyMDU3NjU6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3NjY6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njg6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njk6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njc6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzswOw==", + "took": 4, + "timed_out": false, + "_shards": { + "total": 5, + "successful": 5, + "failed": 0 + }, + "hits": { + "total": 3, + "max_score": null, + "hits": [ ] + } +} \ No newline at end of file