NIFI-2417: Adding Query and Scroll processors for Elasticsearch

Signed-off-by: Matt Burgess <mattyb149@apache.org>
This commit is contained in:
Joe Gresock 2016-08-28 10:09:32 +00:00 committed by Matt Burgess
parent e973874793
commit 00412f6e97
16 changed files with 2126 additions and 1 deletions

View File

@ -92,7 +92,7 @@ language governing permissions and limitations under the License. -->
<artifactId>apache-rat-plugin</artifactId> <artifactId>apache-rat-plugin</artifactId>
<configuration> <configuration>
<excludes combine.children="append"> <excludes combine.children="append">
<exclude>src/test/resources/DocumentExample.json</exclude> <exclude>src/test/resources/*.json</exclude>
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -0,0 +1,410 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.codehaus.jackson.JsonNode;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Response;
import okhttp3.ResponseBody;
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@EventDriven
@SupportsBatching
@Tags({ "elasticsearch", "query", "read", "get", "http" })
@CapabilityDescription("Queries Elasticsearch using the specified connection properties. "
+ "Note that the full body of each page of documents will be read into memory before being "
+ "written to Flow Files for transfer. Also note that the Elasticsearch max_result_window index "
+ "setting is the upper bound on the number of records that can be retrieved using this query. "
+ "To retrieve more records, use the ScrollElasticsearchHttp processor.")
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"),
@WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of "
+ "each result will be placed into corresponding attributes with this prefix.") })
public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
private static final String QUERY_QUERY_PARAM = "q";
private static final String SORT_QUERY_PARAM = "sort";
private static final String FROM_QUERY_PARAM = "from";
private static final String SIZE_QUERY_PARAM = "size";
public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content";
public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes";
private static final String ATTRIBUTE_PREFIX = "es.result.";
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description(
"All FlowFiles that are read from Elasticsearch are routed to this relationship.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description(
"All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
+ "flow files will be routed to failure.").build();
public static final Relationship REL_RETRY = new Relationship.Builder()
.name("retry")
.description(
"A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may "
+ "succeed. Note that if the processor has no incoming connections, flow files may still be sent to this relationship "
+ "based on the processor properties and the results of the fetch operation.")
.build();
public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("query-es-query").displayName("Query")
.description("The Lucene-style query to run against ElasticSearch").required(true)
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
.name("query-es-index").displayName("Index")
.description("The name of the index to read from").required(true)
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("query-es-type")
.displayName("Type")
.description(
"The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
+ "to _all, the first document matching the identifier across all types will be retrieved.")
.required(false).expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
.name("query-es-fields")
.displayName("Fields")
.description(
"A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, "
+ "then the entire document's source will be retrieved.")
.required(false).expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
.name("query-es-sort")
.displayName("Sort")
.description(
"A sort parameter (e.g., timestamp:asc). If the Sort property is left blank, "
+ "then the results will be retrieved in document order.")
.required(false).expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
.name("query-es-size").displayName("Page Size").defaultValue("20")
.description("Determines how many documents to return per page during scrolling.")
.required(true).expressionLanguageSupported(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
public static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
.name("query-es-limit").displayName("Limit")
.description("If set, limits the number of results that will be returned.")
.required(false).expressionLanguageSupported(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
public static final PropertyDescriptor TARGET = new PropertyDescriptor.Builder()
.name("query-es-target")
.displayName("Target")
.description(
"Indicates where the results should be placed. In the case of 'Flow file content', the JSON "
+ "response will be written as the content of the flow file. In the case of 'Flow file attributes', "
+ "the original flow file (if applicable) will be cloned for each result, and all return fields will be placed "
+ "in a flow file attribute of the same name, but prefixed by 'es.result.'")
.required(true).expressionLanguageSupported(false)
.defaultValue(TARGET_FLOW_FILE_CONTENT)
.allowableValues(TARGET_FLOW_FILE_CONTENT, TARGET_FLOW_FILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_RETRY);
return Collections.unmodifiableSet(relationships);
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(ES_URL);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(CONNECT_TIMEOUT);
descriptors.add(RESPONSE_TIMEOUT);
descriptors.add(QUERY);
descriptors.add(PAGE_SIZE);
descriptors.add(INDEX);
descriptors.add(TYPE);
descriptors.add(FIELDS);
descriptors.add(SORT);
descriptors.add(LIMIT);
descriptors.add(TARGET);
return Collections.unmodifiableList(descriptors);
}
@OnScheduled
public void setup(ProcessContext context) {
super.setup(context);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session)
throws ProcessException {
FlowFile flowFile = null;
if (context.hasIncomingConnection()) {
flowFile = session.get();
// If we have no FlowFile, and all incoming connections are self-loops then we can
// continue on.
// However, if we have no FlowFile and we have connections coming from other Processors,
// then
// we know that we should run only if we have a FlowFile.
if (flowFile == null && context.hasNonLoopConnection()) {
return;
}
}
OkHttpClient okHttpClient = getClient();
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile)
.getValue();
final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile)
.getValue();
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
.getValue();
final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
.asInteger().intValue();
final Integer limit = context.getProperty(LIMIT).isSet() ? context.getProperty(LIMIT)
.evaluateAttributeExpressions(flowFile).asInteger().intValue() : null;
final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
.evaluateAttributeExpressions(flowFile).getValue() : null;
final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
.evaluateAttributeExpressions(flowFile).getValue() : null;
final boolean targetIsContent = context.getProperty(TARGET).getValue()
.equals(TARGET_FLOW_FILE_CONTENT);
// Authentication
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
final ComponentLog logger = getLogger();
int fromIndex = 0;
int numResults;
try {
logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index, docType,
query });
final long startNanos = System.nanoTime();
// read the url property from the context
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue());
boolean hitLimit = false;
do {
int mPageSize = pageSize;
if (limit != null && limit <= (fromIndex + pageSize)) {
mPageSize = limit - fromIndex;
hitLimit = true;
}
final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
mPageSize, fromIndex);
final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
username, password, "GET", null);
numResults = this.getPage(getResponse, queryUrl, context, session, flowFile,
logger, startNanos, targetIsContent);
fromIndex += pageSize;
} while (numResults > 0 && !hitLimit);
if (flowFile != null) {
session.remove(flowFile);
}
} catch (IOException ioe) {
logger.error(
"Failed to read from Elasticsearch due to {}, this may indicate an error in configuration "
+ "(hosts, username/password, etc.). Routing to retry",
new Object[] { ioe.getLocalizedMessage() }, ioe);
if (flowFile != null) {
session.transfer(flowFile, REL_RETRY);
}
context.yield();
} catch (RetryableException e) {
logger.error(e.getMessage(), new Object[] { e.getLocalizedMessage() }, e);
if (flowFile != null) {
session.transfer(flowFile, REL_RETRY);
}
context.yield();
} catch (Exception e) {
logger.error("Failed to read {} from Elasticsearch due to {}", new Object[] { flowFile,
e.getLocalizedMessage() }, e);
if (flowFile != null) {
session.transfer(flowFile, REL_FAILURE);
}
context.yield();
}
}
private int getPage(final Response getResponse, final URL url, final ProcessContext context,
final ProcessSession session, FlowFile flowFile, final ComponentLog logger,
final long startNanos, boolean targetIsContent)
throws IOException {
List<FlowFile> page = new ArrayList<>();
final int statusCode = getResponse.code();
if (isSuccess(statusCode)) {
ResponseBody body = getResponse.body();
final byte[] bodyBytes = body.bytes();
JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
JsonNode hits = responseJson.get("hits").get("hits");
for(int i = 0; i < hits.size(); i++) {
JsonNode hit = hits.get(i);
String retrievedId = hit.get("_id").asText();
String retrievedIndex = hit.get("_index").asText();
String retrievedType = hit.get("_type").asText();
FlowFile documentFlowFile = null;
if (flowFile != null) {
documentFlowFile = targetIsContent ? session.create(flowFile) : session.clone(flowFile);
} else {
documentFlowFile = session.create();
}
JsonNode source = hit.get("_source");
documentFlowFile = session.putAttribute(documentFlowFile, "es.index", retrievedIndex);
documentFlowFile = session.putAttribute(documentFlowFile, "es.type", retrievedType);
if (targetIsContent) {
documentFlowFile = session.putAttribute(documentFlowFile, "filename", retrievedId);
documentFlowFile = session.write(documentFlowFile, out -> {
out.write(source.toString().getBytes());
});
} else {
Map<String, String> attributes = new HashMap<>();
for(Iterator<Entry<String, JsonNode>> it = source.getFields(); it.hasNext(); ) {
Entry<String, JsonNode> entry = it.next();
attributes.put(ATTRIBUTE_PREFIX + entry.getKey(), entry.getValue().asText());
}
documentFlowFile = session.putAllAttributes(documentFlowFile, attributes);
}
page.add(documentFlowFile);
}
logger.debug("Elasticsearch retrieved " + responseJson.size() + " documents, routing to success");
session.transfer(page, REL_SUCCESS);
} else {
try {
// 5xx -> RETRY, but a server error might last a while, so yield
if (statusCode / 100 == 5) {
throw new RetryableException(String.format("Elasticsearch returned code %s with message %s, transferring flow file to retry. This is likely a server problem, yielding...",
statusCode, getResponse.message()));
} else if (context.hasIncomingConnection()) { // 1xx, 3xx, 4xx -> NO RETRY
throw new UnretryableException(String.format("Elasticsearch returned code %s with message %s, transferring flow file to failure",
statusCode, getResponse.message()));
} else {
logger.warn("Elasticsearch returned code {} with message {}", new Object[]{statusCode, getResponse.message()});
}
} finally {
if (!page.isEmpty()) {
session.remove(page);
page.clear();
}
}
}
// emit provenance event
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
if (!page.isEmpty()) {
if (context.hasNonLoopConnection()) {
page.forEach(f -> session.getProvenanceReporter().fetch(f, url.toExternalForm(), millis));
} else {
page.forEach(f -> session.getProvenanceReporter().receive(f, url.toExternalForm(), millis));
}
}
return page.size();
}
private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
String sort, int pageSize, int fromIndex) throws MalformedURLException {
if (StringUtils.isEmpty(baseUrl)) {
throw new MalformedURLException("Base URL cannot be null");
}
HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
builder.addPathSegment(index);
builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type);
builder.addPathSegment("_search");
builder.addQueryParameter(QUERY_QUERY_PARAM, query);
builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize));
builder.addQueryParameter(FROM_QUERY_PARAM, String.valueOf(fromIndex));
if (!StringUtils.isEmpty(fields)) {
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
}
if (!StringUtils.isEmpty(sort)) {
String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
builder.addQueryParameter(SORT_QUERY_PARAM, trimmedFields);
}
return builder.build().url();
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
/**
* Represents a retryable exception from ElasticSearch.
*/
public class RetryableException extends RuntimeException {
private static final long serialVersionUID = -2755015600102381620L;
public RetryableException() {
super();
}
public RetryableException(String message, Throwable cause) {
super(message, cause);
}
public RetryableException(String message) {
super(message);
}
public RetryableException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,415 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.codehaus.jackson.JsonNode;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Response;
import okhttp3.ResponseBody;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@EventDriven
@SupportsBatching
@Tags({ "elasticsearch", "query", "scroll", "read", "get", "http" })
@CapabilityDescription("Scrolls through an Elasticsearch query using the specified connection properties. "
+ "This processor is intended to be run on the primary node, and is designed for scrolling through "
+ "huge result sets, as in the case of a reindex. The state must be cleared before another query "
+ "can be run. Each page of results is returned, wrapped in a JSON object like so: { \"hits\" : [ <doc1>, <doc2>, <docn> ] }. "
+ "Note that the full body of each page of documents will be read into memory before being "
+ "written to a Flow File for transfer.")
@WritesAttributes({
@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") })
@Stateful(description = "After each successful scroll page, the latest scroll_id is persisted in scrollId as input for the next scroll call. "
+ "Once the entire query is complete, finishedQuery state will be set to true, and the processor will not execute unless this is cleared.", scopes = { Scope.LOCAL })
public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
private static final String FINISHED_QUERY_STATE = "finishedQuery";
private static final String SCROLL_ID_STATE = "scrollId";
private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
private static final String QUERY_QUERY_PARAM = "q";
private static final String SORT_QUERY_PARAM = "sort";
private static final String SCROLL_QUERY_PARAM = "scroll";
private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
private static final String SIZE_QUERY_PARAM = "size";
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description(
"All FlowFiles that are read from Elasticsearch are routed to this relationship.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description(
"All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming "
+ "flow files will be routed to failure.").build();
public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("scroll-es-query").displayName("Query")
.description("The Lucene-style query to run against ElasticSearch").required(true)
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SCROLL_DURATION = new PropertyDescriptor.Builder()
.name("scroll-es-scroll")
.displayName("Scroll Duration")
.description("The scroll duration is how long each search context is kept in memory.")
.defaultValue("1m")
.required(true)
.expressionLanguageSupported(true)
.addValidator(
StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9]+(m|h)")))
.build();
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
.name("scroll-es-index").displayName("Index")
.description("The name of the index to read from").required(true)
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("scroll-es-type")
.displayName("Type")
.description(
"The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
+ "to _all, the first document matching the identifier across all types will be retrieved.")
.required(false).expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
.name("scroll-es-fields")
.displayName("Fields")
.description(
"A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, "
+ "then the entire document's source will be retrieved.")
.required(false).expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
.name("scroll-es-sort")
.displayName("Sort")
.description(
"A sort parameter (e.g., timestamp:asc). If the Sort property is left blank, "
+ "then the results will be retrieved in document order.")
.required(false).expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
.name("scroll-es-size").displayName("Page Size").defaultValue("20")
.description("Determines how many documents to return per page during scrolling.")
.required(true).expressionLanguageSupported(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
return Collections.unmodifiableSet(relationships);
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(ES_URL);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(CONNECT_TIMEOUT);
descriptors.add(RESPONSE_TIMEOUT);
descriptors.add(QUERY);
descriptors.add(SCROLL_DURATION);
descriptors.add(PAGE_SIZE);
descriptors.add(INDEX);
descriptors.add(TYPE);
descriptors.add(FIELDS);
descriptors.add(SORT);
return Collections.unmodifiableList(descriptors);
}
@OnScheduled
public void setup(ProcessContext context) {
super.setup(context);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session)
throws ProcessException {
try {
if (isQueryFinished(context.getStateManager())) {
getLogger().trace(
"Query has been marked finished in the state manager. "
+ "To run another query, clear the state.");
return;
}
} catch (IOException e) {
throw new ProcessException("Could not retrieve state", e);
}
OkHttpClient okHttpClient = getClient();
FlowFile flowFile = session.create();
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile)
.getValue();
final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile)
.getValue();
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
.getValue();
final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
.asInteger().intValue();
final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
.evaluateAttributeExpressions(flowFile).getValue() : null;
final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
.evaluateAttributeExpressions(flowFile).getValue() : null;
final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? context
.getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null;
// Authentication
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
final ComponentLog logger = getLogger();
try {
String scrollId = loadScrollId(context.getStateManager());
if (scrollId != null) {
// read the url property from the context
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
.getValue());
final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
scrollId, pageSize, scroll);
final long startNanos = System.nanoTime();
final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl,
username, password, "GET", null);
this.getPage(getResponse, scrollurl, context, session, flowFile, logger, startNanos);
} else {
logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index,
docType, query });
// read the url property from the context
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
.getValue());
final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
scrollId, pageSize, scroll);
final long startNanos = System.nanoTime();
final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
username, password, "GET", null);
this.getPage(getResponse, queryUrl, context, session, flowFile, logger, startNanos);
}
} catch (IOException ioe) {
logger.error(
"Failed to read from Elasticsearch due to {}, this may indicate an error in configuration "
+ "(hosts, username/password, etc.).",
new Object[] { ioe.getLocalizedMessage() }, ioe);
session.remove(flowFile);
context.yield();
} catch (Exception e) {
logger.error("Failed to read {} from Elasticsearch due to {}", new Object[] { flowFile,
e.getLocalizedMessage() }, e);
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
private void getPage(final Response getResponse, final URL url, final ProcessContext context,
final ProcessSession session, FlowFile flowFile, final ComponentLog logger, final long startNanos)
throws IOException {
final int statusCode = getResponse.code();
if (isSuccess(statusCode)) {
ResponseBody body = getResponse.body();
final byte[] bodyBytes = body.bytes();
JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes));
String scrollId = responseJson.get("_scroll_id").asText();
StringBuilder builder = new StringBuilder();
builder.append("{ \"hits\" : [");
JsonNode hits = responseJson.get("hits").get("hits");
if (hits.size() == 0) {
finishQuery(context.getStateManager());
session.remove(flowFile);
return;
}
for(int i = 0; i < hits.size(); i++) {
JsonNode hit = hits.get(i);
String retrievedIndex = hit.get("_index").asText();
String retrievedType = hit.get("_type").asText();
JsonNode source = hit.get("_source");
flowFile = session.putAttribute(flowFile, "es.index", retrievedIndex);
flowFile = session.putAttribute(flowFile, "es.type", retrievedType);
builder.append(source.toString());
if (i < hits.size() - 1) {
builder.append(", ");
}
}
builder.append("] }");
logger.debug("Elasticsearch retrieved " + responseJson.size() + " documents, routing to success");
flowFile = session.write(flowFile, out -> {
out.write(builder.toString().getBytes());
});
session.transfer(flowFile, REL_SUCCESS);
saveScrollId(context.getStateManager(), scrollId);
// emit provenance event
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().receive(flowFile, url.toExternalForm(), millis);
} else {
// 5xx -> RETRY, but a server error might last a while, so yield
if (statusCode / 100 == 5) {
logger.warn("Elasticsearch returned code {} with message {}, removing the flow file. This is likely a server problem, yielding...",
new Object[]{statusCode, getResponse.message()});
session.remove(flowFile);
context.yield();
} else {
logger.warn("Elasticsearch returned code {} with message {}", new Object[]{statusCode, getResponse.message()});
session.remove(flowFile);
}
}
}
private boolean isQueryFinished(StateManager stateManager) throws IOException {
final StateMap stateMap = stateManager.getState(Scope.LOCAL);
if (stateMap.getVersion() < 0) {
getLogger().debug("No previous state found");
return false;
}
final String isQueryFinished = stateMap.get(FINISHED_QUERY_STATE);
getLogger().debug("Loaded state with finishedQuery = {}", new Object[] { isQueryFinished });
return "true".equals(isQueryFinished);
}
private String loadScrollId(StateManager stateManager) throws IOException {
final StateMap stateMap = stateManager.getState(Scope.LOCAL);
if (stateMap.getVersion() < 0) {
getLogger().debug("No previous state found");
return null;
}
final String scrollId = stateMap.get(SCROLL_ID_STATE);
getLogger().debug("Loaded state with scrollId {}", new Object[] { scrollId });
return scrollId;
}
private void finishQuery(StateManager stateManager) throws IOException {
Map<String, String> state = new HashMap<>(2);
state.put(FINISHED_QUERY_STATE, "true");
getLogger().debug("Saving state with finishedQuery = true");
stateManager.setState(state, Scope.LOCAL);
}
private void saveScrollId(StateManager stateManager, String scrollId) throws IOException {
Map<String, String> state = new HashMap<>(2);
state.put(SCROLL_ID_STATE, scrollId);
getLogger().debug("Saving state with scrollId of {}", new Object[] { scrollId });
stateManager.setState(state, Scope.LOCAL);
}
private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
String sort, String scrollId, int pageSize, String scroll) throws MalformedURLException {
if (StringUtils.isEmpty(baseUrl)) {
throw new MalformedURLException("Base URL cannot be null");
}
HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
if (!StringUtils.isEmpty(scrollId)) {
builder.addPathSegment("_search");
builder.addPathSegment("scroll");
builder.addQueryParameter(SCROLL_ID_QUERY_PARAM, scrollId);
} else {
builder.addPathSegment(index);
builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type);
builder.addPathSegment("_search");
builder.addQueryParameter(QUERY_QUERY_PARAM, query);
builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize));
if (!StringUtils.isEmpty(fields)) {
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
}
if (!StringUtils.isEmpty(sort)) {
String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
builder.addQueryParameter(SORT_QUERY_PARAM, trimmedFields);
}
}
builder.addQueryParameter(SCROLL_QUERY_PARAM, scroll);
return builder.build().url();
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
/**
* Represents an unrecoverable error from ElasticSearch.
* @author jgresock
*
*/
public class UnretryableException extends RuntimeException {
private static final long serialVersionUID = -4528006567211380914L;
public UnretryableException() {
super();
}
public UnretryableException(String message, Throwable cause) {
super(message, cause);
}
public UnretryableException(String message) {
super(message);
}
public UnretryableException(Throwable cause) {
super(cause);
}
}

View File

@ -16,3 +16,5 @@ org.apache.nifi.processors.elasticsearch.FetchElasticsearch
org.apache.nifi.processors.elasticsearch.PutElasticsearch org.apache.nifi.processors.elasticsearch.PutElasticsearch
org.apache.nifi.processors.elasticsearch.FetchElasticsearchHttp org.apache.nifi.processors.elasticsearch.FetchElasticsearchHttp
org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp
org.apache.nifi.processors.elasticsearch.QueryElasticsearchHttp
org.apache.nifi.processors.elasticsearch.ScrollElasticsearchHttp

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Test;
public class ITQueryElasticsearchHttp {
private TestRunner runner;
@After
public void teardown() {
runner = null;
}
@Test
public void testFetchElasticsearchOnTrigger() throws IOException {
runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://localhost.internal:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.TYPE, "provenance");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.QUERY,
"identifier:2f79eba8839f5976cd0b1e16a0e7fe8d7dd0ceca");
runner.setProperty(QueryElasticsearchHttp.SORT, "timestamp:asc");
runner.setProperty(QueryElasticsearchHttp.FIELDS, "transit_uri,version");
runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "1");
runner.assertValid();
runner.setIncomingConnection(false);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, 3);
final MockFlowFile out = runner.getFlowFilesForRelationship(
QueryElasticsearchHttp.REL_SUCCESS).get(0);
assertNotNull(out);
}
@Test
public void testFetchElasticsearchOnTrigger_IncomingFile() throws IOException {
runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://localhost.internal:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.TYPE, "provenance");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.QUERY, "${query}");
runner.setProperty(QueryElasticsearchHttp.SORT, "timestamp:asc");
runner.setProperty(QueryElasticsearchHttp.FIELDS, "transit_uri,version");
runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "1");
runner.assertValid();
Map<String, String> attributes = new HashMap<>();
attributes.put("query", "identifier:2f79eba8839f5976cd0b1e16a0e7fe8d7dd0ceca");
runner.enqueue("".getBytes(), attributes);
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, 3);
final MockFlowFile out = runner.getFlowFilesForRelationship(
QueryElasticsearchHttp.REL_SUCCESS).get(0);
assertNotNull(out);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Test;
public class ITScrollElasticsearchHttp {
private TestRunner runner;
@After
public void teardown() {
runner = null;
}
@Test
public void testFetchElasticsearchOnTrigger() throws IOException {
runner = TestRunners.newTestRunner(ScrollElasticsearchHttp.class); // all docs are found
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://ip-172-31-49-152.ec2.internal:9200");
runner.setProperty(ScrollElasticsearchHttp.INDEX, "prod-accounting");
runner.assertNotValid();
runner.setProperty(ScrollElasticsearchHttp.TYPE, "provenance");
runner.assertNotValid();
runner.setProperty(ScrollElasticsearchHttp.QUERY,
"identifier:2f79eba8839f5976cd0b1e16a0e7fe8d7dd0ceca");
runner.setProperty(ScrollElasticsearchHttp.SORT, "timestamp:asc");
runner.setProperty(ScrollElasticsearchHttp.FIELDS, "transit_uri,version");
runner.setProperty(ScrollElasticsearchHttp.PAGE_SIZE, "1");
runner.assertValid();
runner.setIncomingConnection(false);
runner.run(4, true, true);
runner.assertAllFlowFilesTransferred(ScrollElasticsearchHttp.REL_SUCCESS, 3);
final MockFlowFile out = runner.getFlowFilesForRelationship(
ScrollElasticsearchHttp.REL_SUCCESS).get(0);
assertNotNull(out);
}
}

View File

@ -0,0 +1,443 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.stubbing.OngoingStubbing;
import okhttp3.Call;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
public class TestQueryElasticsearchHttp {
private TestRunner runner;
@After
public void teardown() {
runner = null;
}
@Test
public void testQueryElasticsearchOnTrigger_withInput() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.QUERY,
"source:Twitter AND identifier:\"${identifier}\"");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
runner.assertValid();
runAndVerifySuccess(true);
}
@Test
public void testQueryElasticsearchOnTrigger_withInput_attributeTarget() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.QUERY,
"source:Twitter AND identifier:\"${identifier}\"");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.TARGET,
QueryElasticsearchHttp.TARGET_FLOW_FILE_ATTRIBUTES);
runAndVerifySuccess(false);
final MockFlowFile out = runner.getFlowFilesForRelationship(
QueryElasticsearchHttp.REL_SUCCESS).get(0);
assertNotNull(out);
assertEquals("blah", new String(out.toByteArray()));
assertEquals("Twitter", out.getAttribute("es.result.source"));
}
@Test
public void testQueryElasticsearchOnTrigger_withNoInput() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.QUERY,
"source:Twitter AND identifier:\"${identifier}\"");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
runner.assertValid();
runner.setIncomingConnection(false);
runAndVerifySuccess(true);
}
private void runAndVerifySuccess(int expectedResults, boolean targetIsContent) {
runner.enqueue("blah".getBytes(), new HashMap<String, String>() {
{
put("identifier", "28039652140");
}
});
// Running once should page through all 3 docs
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_SUCCESS, expectedResults);
final MockFlowFile out = runner.getFlowFilesForRelationship(
QueryElasticsearchHttp.REL_SUCCESS).get(0);
assertNotNull(out);
if (targetIsContent) {
out.assertAttributeEquals("filename", "abc-97b-ASVsZu_"
+ "vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3");
}
}
// By default, 3 files should go to Success
private void runAndVerifySuccess(boolean targetIsContent) {
runAndVerifySuccess(3, targetIsContent);
}
@Test
public void testQueryElasticsearchOnTriggerWithFields() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.FIELDS, "id,, userinfo.location");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.SORT, "timestamp:asc,identifier:desc");
runner.assertValid();
runAndVerifySuccess(true);
}
@Test
public void testQueryElasticsearchOnTriggerWithLimit() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.FIELDS, "id,, userinfo.location");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.SORT, "timestamp:asc,identifier:desc");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.LIMIT, "2");
runAndVerifySuccess(2, true);
}
@Test
public void testQueryElasticsearchOnTriggerWithServerErrorRetry() throws IOException {
QueryElasticsearchHttpTestProcessor processor = new QueryElasticsearchHttpTestProcessor();
processor.setStatus(500, "Server error");
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setValidateExpressionUsage(true);
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
runner.enqueue("".getBytes(), new HashMap<String, String>() {
{
put("identifier", "28039652140");
}
});
runner.run(1, true, true);
// This test generates a HTTP 500 "Server error"
runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_RETRY, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(
QueryElasticsearchHttp.REL_RETRY).get(0);
assertNotNull(out);
}
@Test
public void testQueryElasticsearchOnTriggerWithServerFail() throws IOException {
QueryElasticsearchHttpTestProcessor processor = new QueryElasticsearchHttpTestProcessor();
processor.setStatus(100, "Should fail");
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setValidateExpressionUsage(true);
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
runner.enqueue("".getBytes(), new HashMap<String, String>() {
{
put("identifier", "28039652140");
}
});
runner.run(1, true, true);
// This test generates a HTTP 100 "Should fail"
runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(
QueryElasticsearchHttp.REL_FAILURE).get(0);
assertNotNull(out);
}
@Test
public void testQueryElasticsearchOnTriggerWithIOException() throws IOException {
QueryElasticsearchHttpTestProcessor processor = new QueryElasticsearchHttpTestProcessor();
processor.setExceptionToThrow(new IOException("Error reading from disk"));
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setValidateExpressionUsage(true);
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
runner.enqueue("".getBytes(), new HashMap<String, String>() {
{
put("identifier", "28039652140");
}
});
runner.run(1, true, true);
// This test generates a HTTP 100 "Should fail"
runner.assertAllFlowFilesTransferred(QueryElasticsearchHttp.REL_RETRY, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(
QueryElasticsearchHttp.REL_RETRY).get(0);
assertNotNull(out);
}
@Test
public void testQueryElasticsearchOnTriggerWithServerFailAfterSuccess() throws IOException {
QueryElasticsearchHttpTestProcessor processor = new QueryElasticsearchHttpTestProcessor();
processor.setStatus(100, "Should fail", 2);
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setValidateExpressionUsage(true);
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
runner.enqueue("".getBytes(), new HashMap<String, String>() {
{
put("identifier", "28039652140");
}
});
runner.run(1, true, true);
// This test generates a HTTP 100 "Should fail"
runner.assertTransferCount(QueryElasticsearchHttp.REL_SUCCESS, 2);
runner.assertTransferCount(QueryElasticsearchHttp.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(
QueryElasticsearchHttp.REL_FAILURE).get(0);
assertNotNull(out);
}
@Test
public void testQueryElasticsearchOnTriggerWithServerFailNoIncomingFlowFile() throws IOException {
QueryElasticsearchHttpTestProcessor processor = new QueryElasticsearchHttpTestProcessor();
processor.setStatus(100, "Should fail", 1);
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setValidateExpressionUsage(true);
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
runner.setIncomingConnection(false);
runner.run(1, true, true);
// This test generates a HTTP 100 with no incoming flow file, so nothing should be transferred
processor.getRelationships().forEach(relationship -> runner.assertTransferCount(relationship, 0));
runner.assertTransferCount(QueryElasticsearchHttp.REL_FAILURE, 0);
}
@Test
public void testSetupSecureClient() throws Exception {
QueryElasticsearchHttpTestProcessor processor = new QueryElasticsearchHttpTestProcessor();
runner = TestRunners.newTestRunner(processor);
SSLContextService sslService = mock(SSLContextService.class);
when(sslService.getIdentifier()).thenReturn("ssl-context");
runner.addControllerService("ssl-context", sslService);
runner.enableControllerService(sslService);
runner.setProperty(QueryElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setValidateExpressionUsage(true);
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
// Allow time for the controller service to fully initialize
Thread.sleep(500);
runner.enqueue("".getBytes(), new HashMap<String, String>() {
{
put("doc_id", "28039652140");
}
});
runner.run(1, true, true);
}
/**
* A Test class that extends the processor in order to inject/mock behavior
*/
private static class QueryElasticsearchHttpTestProcessor extends QueryElasticsearchHttp {
Exception exceptionToThrow = null;
OkHttpClient client;
int goodStatusCode = 200;
String goodStatusMessage = "OK";
int badStatusCode;
String badStatusMessage;
int runNumber;
List<String> pages = Arrays.asList(getDoc("query-page1.json"), getDoc("query-page2.json"),
getDoc("query-page3.json"));
public void setExceptionToThrow(Exception exceptionToThrow) {
this.exceptionToThrow = exceptionToThrow;
}
/**
* Sets the status code and message for the 1st query
*
* @param code
* The status code to return
* @param message
* The status message
*/
void setStatus(int code, String message) {
this.setStatus(code, message, 1);
}
/**
* Sets the status code and message for the runNumber-th query
*
* @param code
* The status code to return
* @param message
* The status message
* @param runNumber
* The run number for which to set this status
*/
void setStatus(int code, String message, int runNumber) {
badStatusCode = code;
badStatusMessage = message;
this.runNumber = runNumber;
}
@Override
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
client = mock(OkHttpClient.class);
OngoingStubbing<Call> stub = when(client.newCall(any(Request.class)));
for (int i = 0; i < pages.size(); i++) {
String page = pages.get(i);
if (runNumber == i + 1) {
stub = mockReturnDocument(stub, page, badStatusCode, badStatusMessage);
} else {
stub = mockReturnDocument(stub, page, goodStatusCode, goodStatusMessage);
}
}
}
private OngoingStubbing<Call> mockReturnDocument(OngoingStubbing<Call> stub,
final String document, int statusCode, String statusMessage) {
return stub.thenAnswer(new Answer<Call>() {
@Override
public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
Request realRequest = (Request) invocationOnMock.getArguments()[0];
Response mockResponse = new Response.Builder()
.request(realRequest)
.protocol(Protocol.HTTP_1_1)
.code(statusCode)
.message(statusMessage)
.body(ResponseBody.create(MediaType.parse("application/json"), document))
.build();
final Call call = mock(Call.class);
if (exceptionToThrow != null) {
when(call.execute()).thenThrow(exceptionToThrow);
} else {
when(call.execute()).thenReturn(mockResponse);
}
return call;
}
});
}
protected OkHttpClient getClient() {
return client;
}
}
private static String getDoc(String filename) {
try {
return IOUtils.toString(QueryElasticsearchHttp.class.getClassLoader()
.getResourceAsStream(filename));
} catch (IOException e) {
System.out.println("Error reading document " + filename);
return "";
}
}
}

View File

@ -0,0 +1,398 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.stubbing.OngoingStubbing;
import okhttp3.Call;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
public class TestScrollElasticsearchHttp {
private TestRunner runner;
@After
public void teardown() {
runner = null;
}
@Test
public void testScrollElasticsearchOnTrigger_withNoInput() throws IOException {
runner = TestRunners.newTestRunner(new ScrollElasticsearchHttpTestProcessor());
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
runner.assertNotValid();
runner.setProperty(ScrollElasticsearchHttp.QUERY,
"source:WZ AND identifier:\"${identifier}\"");
runner.assertValid();
runner.setProperty(ScrollElasticsearchHttp.PAGE_SIZE, "2");
runner.assertValid();
runner.setIncomingConnection(false);
runAndVerifySuccess();
}
private void runAndVerifySuccess() {
runner.enqueue("".getBytes(), new HashMap<String, String>() {
{
put("identifier", "28039652140");
}
});
// Must run once for each of the 3 pages
runner.run(3, true, true);
runner.assertAllFlowFilesTransferred(ScrollElasticsearchHttp.REL_SUCCESS, 2);
final MockFlowFile out = runner.getFlowFilesForRelationship(
ScrollElasticsearchHttp.REL_SUCCESS).get(0);
assertNotNull(out);
int numHits = runner.getFlowFilesForRelationship(
ScrollElasticsearchHttp.REL_SUCCESS).stream().map(ff -> {
String page = new String(ff.toByteArray());
return StringUtils.countMatches(page, "{\"timestamp\"");
})
.reduce((a, b) -> a + b).get();
Assert.assertEquals(3, numHits);
}
@Test
public void testScrollElasticsearchOnTriggerWithFields() throws IOException {
runner = TestRunners.newTestRunner(new ScrollElasticsearchHttpTestProcessor());
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
runner.assertNotValid();
runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
runner.assertValid();
runner.setProperty(ScrollElasticsearchHttp.FIELDS, "id,, userinfo.location");
runner.assertValid();
runner.setProperty(ScrollElasticsearchHttp.SORT, "timestamp:asc,identifier:desc");
runner.assertValid();
runner.setIncomingConnection(false);
runAndVerifySuccess();
}
@Test
public void testScrollElasticsearchOnTriggerWithServerFail() throws IOException {
ScrollElasticsearchHttpTestProcessor processor = new ScrollElasticsearchHttpTestProcessor();
processor.setStatus(100, "Should fail");
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
runner.setValidateExpressionUsage(true);
runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
runner.setIncomingConnection(false);
runner.enqueue("".getBytes(), new HashMap<String, String>() {
{
put("identifier", "28039652140");
}
});
runner.run(1, true, true);
// This test generates a HTTP 100 "Should fail"
runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 0);
runner.assertTransferCount(ScrollElasticsearchHttp.REL_SUCCESS, 0);
}
@Test
public void testScrollElasticsearchOnTriggerWithServerRetry() throws IOException {
ScrollElasticsearchHttpTestProcessor processor = new ScrollElasticsearchHttpTestProcessor();
processor.setStatus(500, "Internal error");
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
runner.setValidateExpressionUsage(true);
runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
runner.setIncomingConnection(false);
runner.enqueue("".getBytes(), new HashMap<String, String>() {
{
put("identifier", "28039652140");
}
});
runner.run(1, true, true);
// This test generates a HTTP 500 "Internal error"
runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 0);
runner.assertTransferCount(ScrollElasticsearchHttp.REL_SUCCESS, 0);
}
@Test
public void testScrollElasticsearchOnTriggerWithServerFailAfterSuccess() throws IOException {
ScrollElasticsearchHttpTestProcessor processor = new ScrollElasticsearchHttpTestProcessor();
processor.setStatus(100, "Should fail", 2);
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
runner.setValidateExpressionUsage(true);
runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
runner.enqueue("".getBytes(), new HashMap<String, String>() {
{
put("identifier", "28039652140");
}
});
runner.setIncomingConnection(false);
runner.run(1, true, true);
// This test generates a HTTP 100 "Should fail"
runner.assertTransferCount(ScrollElasticsearchHttp.REL_SUCCESS, 1);
runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 0);
}
@Test
public void testScrollElasticsearchOnTriggerWithServerFailNoIncomingFlowFile() throws IOException {
ScrollElasticsearchHttpTestProcessor processor = new ScrollElasticsearchHttpTestProcessor();
processor.setStatus(100, "Should fail", 1);
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
runner.setValidateExpressionUsage(true);
runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
runner.setIncomingConnection(false);
runner.run(1, true, true);
// This test generates a HTTP 100 with no incoming flow file, so nothing should be transferred
processor.getRelationships().forEach(relationship -> runner.assertTransferCount(relationship, 0));
runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 0);
}
@Test
public void testSetupSecureClient() throws Exception {
ScrollElasticsearchHttpTestProcessor processor = new ScrollElasticsearchHttpTestProcessor();
runner = TestRunners.newTestRunner(processor);
SSLContextService sslService = mock(SSLContextService.class);
when(sslService.getIdentifier()).thenReturn("ssl-context");
runner.addControllerService("ssl-context", sslService);
runner.enableControllerService(sslService);
runner.setProperty(ScrollElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.setValidateExpressionUsage(true);
runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
runner.setIncomingConnection(false);
// Allow time for the controller service to fully initialize
Thread.sleep(500);
runner.enqueue("".getBytes(), new HashMap<String, String>() {
{
put("doc_id", "28039652140");
}
});
runner.run(1, true, true);
}
@Test
public void testScrollElasticsearchOnTriggerWithIOException() throws IOException {
ScrollElasticsearchHttpTestProcessor processor = new ScrollElasticsearchHttpTestProcessor();
processor.setExceptionToThrow(new IOException("Error reading from disk"));
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
runner.setValidateExpressionUsage(true);
runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
runner.enqueue("".getBytes(), new HashMap<String, String>() {
{
put("identifier", "28039652140");
}
});
runner.run(1, true, true);
// This test generates a HTTP 100 "Should fail"
runner.assertTransferCount(ScrollElasticsearchHttp.REL_SUCCESS, 0);
runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 0);
}
@Test
public void testScrollElasticsearchOnTriggerWithOtherException() throws IOException {
ScrollElasticsearchHttpTestProcessor processor = new ScrollElasticsearchHttpTestProcessor();
processor.setExceptionToThrow(new IllegalArgumentException("Error reading from disk"));
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
runner.setValidateExpressionUsage(true);
runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
runner.enqueue("".getBytes(), new HashMap<String, String>() {
{
put("identifier", "28039652140");
}
});
runner.run(1, true, true);
// This test generates a HTTP 100 "Should fail"
runner.assertTransferCount(ScrollElasticsearchHttp.REL_SUCCESS, 0);
runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 1);
}
/**
* A Test class that extends the processor in order to inject/mock behavior
*/
private static class ScrollElasticsearchHttpTestProcessor extends ScrollElasticsearchHttp {
Exception exceptionToThrow = null;
OkHttpClient client;
int goodStatusCode = 200;
String goodStatusMessage = "OK";
int badStatusCode;
String badStatusMessage;
int runNumber;
List<String> pages = Arrays.asList(getDoc("scroll-page1.json"),
getDoc("scroll-page2.json"), getDoc("scroll-page3.json"));
public void setExceptionToThrow(Exception exceptionToThrow) {
this.exceptionToThrow = exceptionToThrow;
}
/**
* Sets the status code and message for the 1st query
*
* @param code
* The status code to return
* @param message
* The status message
*/
void setStatus(int code, String message) {
this.setStatus(code, message, 1);
}
/**
* Sets the status code and message for the runNumber-th query
*
* @param code
* The status code to return
* @param message
* The status message
* @param runNumber
* The run number for which to set this status
*/
void setStatus(int code, String message, int runNumber) {
badStatusCode = code;
badStatusMessage = message;
this.runNumber = runNumber;
}
@Override
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
client = mock(OkHttpClient.class);
OngoingStubbing<Call> stub = when(client.newCall(any(Request.class)));
for (int i = 0; i < pages.size(); i++) {
String page = pages.get(i);
if (runNumber == i + 1) {
stub = mockReturnDocument(stub, page, badStatusCode, badStatusMessage);
} else {
stub = mockReturnDocument(stub, page, goodStatusCode, goodStatusMessage);
}
}
}
private OngoingStubbing<Call> mockReturnDocument(OngoingStubbing<Call> stub,
final String document, int statusCode, String statusMessage) {
return stub.thenAnswer(new Answer<Call>() {
@Override
public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
Request realRequest = (Request) invocationOnMock.getArguments()[0];
Response mockResponse = new Response.Builder()
.request(realRequest)
.protocol(Protocol.HTTP_1_1)
.code(statusCode)
.message(statusMessage)
.body(ResponseBody.create(MediaType.parse("application/json"), document))
.build();
final Call call = mock(Call.class);
if (exceptionToThrow != null) {
when(call.execute()).thenThrow(exceptionToThrow);
} else {
when(call.execute()).thenReturn(mockResponse);
}
return call;
}
});
}
protected OkHttpClient getClient() {
return client;
}
}
private static String getDoc(String filename) {
try {
return IOUtils.toString(ScrollElasticsearchHttp.class.getClassLoader()
.getResourceAsStream(filename));
} catch (IOException e) {
System.out.println("Error reading document " + filename);
return "";
}
}
}

View File

@ -0,0 +1,57 @@
{
"took": 3,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": null,
"hits": [
{
"_index": "myindex",
"_type": "provenance",
"_id": "abc-97b-ASVsZu_vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3",
"_score": null,
"_source": {
"timestamp": "2016-07-22T14:47:08.102Z",
"event_type": "SEND",
"source": "Twitter",
"identifier": "abc-97b",
"transit_type": "S3",
"transit_uri": "file://cluster2/data/outgoing/S3/abc-97b.zip",
"object_type": "Provenance Record",
"version": "ASVsZu_vShwtGCJpGOObmuSqUJRUC3L_",
"file_size": "3645525"
},
"sort": [
1469198828102
]
},
{
"_index": "myindex",
"_type": "provenance",
"_id": "abc-a78-SjJkrwnv6edIRqJChEYzrE7PeT1hzioz-SEND-S3",
"_score": null,
"_source": {
"timestamp": "2016-07-22T14:47:08.101Z",
"event_type": "SEND",
"source": "Twitter",
"identifier": "abc-a78",
"transit_type": "S3",
"transit_uri": "file://cluster2/data/outgoing/S3/abc-a78.zip",
"object_type": "Provenance Record",
"version": "SjJkrwnv6edIRqJChEYzrE7PeT1hzioz",
"file_size": "4480294"
},
"sort": [
1469198828101
]
}
]
}
}

View File

@ -0,0 +1,36 @@
{
"took": 3,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": null,
"hits": [
{
"_index": "myindex",
"_type": "provenance",
"_id": "abc-42a-ArPsIlGBKqDvfL6qQZOVpmDwUEB.nynh-SEND-S3",
"_score": null,
"_source": {
"timestamp": "2016-07-22T14:47:08.101Z",
"event_type": "SEND",
"source": "Twitter",
"identifier": "abc-42a",
"transit_type": "S3",
"transit_uri": "file://cluster2/data/outgoing/S3/abc-42a.zip",
"object_type": "Provenance Record",
"version": "ArPsIlGBKqDvfL6qQZOVpmDwUEB.nynh",
"file_size": "18206872"
},
"sort": [
1469198828101
]
}
]
}
}

View File

@ -0,0 +1,14 @@
{
"took": 6,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": null,
"hits": [ ]
}
}

View File

@ -0,0 +1,56 @@
{
"_scroll_id": "cXVlcnlUaGVuRmV0Y2g7NTsyMDU3NjU6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3NjY6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njg6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njk6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njc6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzswOw==",
"took": 4,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": null,
"hits": [
{
"_index": "myindex",
"_type": "provenance",
"_id": "abc-97b-ASVsZu_vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3",
"_score": null,
"_source": {
"timestamp": "2016-07-22T14:47:08.102Z",
"event_type": "SEND",
"source": "Twitter",
"identifier": "abc-97b",
"transit_type": "S3",
"transit_uri": "file://cluster2/data/outgoing/S3/abc-97b.zip",
"object_type": "Provenance Record",
"version": "ASVsZu_vShwtGCJpGOObmuSqUJRUC3L_",
"file_size": "3645525"
},
"sort": [
1469198828102
]
},
{
"_index": "myindex",
"_type": "provenance",
"_id": "abc-a78-SjJkrwnv6edIRqJChEYzrE7PeT1hzioz-SEND-S3",
"_score": null,
"_source": {
"timestamp": "2016-07-22T14:47:08.101Z",
"event_type": "SEND",
"source": "Twitter",
"identifier": "abc-a78",
"transit_type": "S3",
"transit_uri": "file://cluster2/data/outgoing/S3/abc-a78.zip",
"object_type": "Provenance Record",
"version": "SjJkrwnv6edIRqJChEYzrE7PeT1hzioz",
"file_size": "4480294"
},
"sort": [
1469198828101
]
}
]
}
}

View File

@ -0,0 +1,36 @@
{
"_scroll_id": "dXVlcnlUaGVuRmV0Y2g7NTsyMDU3NjU6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3NjY6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njg6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njk6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njc6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzswOw==",
"took": 4,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": null,
"hits": [
{
"_index": "myindex",
"_type": "provenance",
"_id": "abc-97b-ASVsZu_vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3",
"_score": null,
"_source": {
"timestamp": "2016-07-22T14:47:08.102Z",
"event_type": "SEND",
"source": "Twitter",
"identifier": "abc-97b",
"transit_type": "S3",
"transit_uri": "file://cluster2/data/outgoing/S3/abc-97b.zip",
"object_type": "Provenance Record",
"version": "ASVsZu_vShwtGCJpGOObmuSqUJRUC3L_",
"file_size": "3645525"
},
"sort": [
1469198828102
]
}
]
}
}

View File

@ -0,0 +1,15 @@
{
"_scroll_id": "eXVlcnlUaGVuRmV0Y2g7NTsyMDU3NjU6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3NjY6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njg6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njk6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzsyMDU3Njc6WUlIQVpmWTlRZWl4aURSWUVVR0lXdzswOw==",
"took": 4,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": null,
"hits": [ ]
}
}