mirror of https://github.com/apache/nifi.git
NIFI-4218: Dynamic properties as query parameters in ESHttp processors
This closes #2049. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
69a08e78c2
commit
6b5015e39b
|
@ -50,6 +50,12 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor {
|
public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor {
|
||||||
|
|
||||||
|
static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
|
||||||
|
static final String QUERY_QUERY_PARAM = "q";
|
||||||
|
static final String SORT_QUERY_PARAM = "sort";
|
||||||
|
static final String SIZE_QUERY_PARAM = "size";
|
||||||
|
|
||||||
|
|
||||||
public static final PropertyDescriptor ES_URL = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor ES_URL = new PropertyDescriptor.Builder()
|
||||||
.name("elasticsearch-http-url")
|
.name("elasticsearch-http-url")
|
||||||
.displayName("Elasticsearch URL")
|
.displayName("Elasticsearch URL")
|
||||||
|
@ -97,6 +103,17 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
|
||||||
|
|
||||||
private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
|
private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
|
||||||
|
return new PropertyDescriptor.Builder()
|
||||||
|
.name(propertyDescriptorName)
|
||||||
|
.required(false)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.dynamic(true)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
|
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
|
||||||
okHttpClientAtomicReference.set(null);
|
okHttpClientAtomicReference.set(null);
|
||||||
|
|
|
@ -21,6 +21,7 @@ import okhttp3.OkHttpClient;
|
||||||
import okhttp3.Response;
|
import okhttp3.Response;
|
||||||
import okhttp3.ResponseBody;
|
import okhttp3.ResponseBody;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
@ -37,9 +38,9 @@ import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.stream.io.ByteArrayInputStream;
|
|
||||||
import org.codehaus.jackson.JsonNode;
|
import org.codehaus.jackson.JsonNode;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
@ -47,6 +48,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -65,10 +67,13 @@ import java.util.stream.Stream;
|
||||||
@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
|
@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
|
||||||
@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type")
|
@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type")
|
||||||
})
|
})
|
||||||
|
@DynamicProperty(
|
||||||
|
name = "A URL query parameter",
|
||||||
|
value = "The value to set it to",
|
||||||
|
supportsExpressionLanguage = true,
|
||||||
|
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing")
|
||||||
public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
|
|
||||||
private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
|
|
||||||
|
|
||||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||||
.name("success")
|
.name("success")
|
||||||
.description("All FlowFiles that are read from Elasticsearch are routed to this relationship.")
|
.description("All FlowFiles that are read from Elasticsearch are routed to this relationship.")
|
||||||
|
@ -212,7 +217,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
|
|
||||||
// read the url property from the context
|
// read the url property from the context
|
||||||
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
|
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
|
||||||
final URL url = buildRequestURL(urlstr, docId, index, docType, fields);
|
final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context);
|
||||||
final long startNanos = System.nanoTime();
|
final long startNanos = System.nanoTime();
|
||||||
|
|
||||||
getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
|
getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
|
||||||
|
@ -304,7 +309,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields) throws MalformedURLException {
|
private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context) throws MalformedURLException {
|
||||||
if (StringUtils.isEmpty(baseUrl)) {
|
if (StringUtils.isEmpty(baseUrl)) {
|
||||||
throw new MalformedURLException("Base URL cannot be null");
|
throw new MalformedURLException("Base URL cannot be null");
|
||||||
}
|
}
|
||||||
|
@ -317,6 +322,16 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
|
builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Find the user-added properties and set them as query parameters on the URL
|
||||||
|
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
|
||||||
|
PropertyDescriptor pd = property.getKey();
|
||||||
|
if (pd.isDynamic()) {
|
||||||
|
if (property.getValue() != null) {
|
||||||
|
builder.addQueryParameter(pd.getName(), context.getProperty(pd).evaluateAttributeExpressions().getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return builder.build().url();
|
return builder.build().url();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,12 +16,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.elasticsearch;
|
package org.apache.nifi.processors.elasticsearch;
|
||||||
|
|
||||||
|
import okhttp3.HttpUrl;
|
||||||
import okhttp3.MediaType;
|
import okhttp3.MediaType;
|
||||||
import okhttp3.OkHttpClient;
|
import okhttp3.OkHttpClient;
|
||||||
import okhttp3.RequestBody;
|
import okhttp3.RequestBody;
|
||||||
import okhttp3.Response;
|
import okhttp3.Response;
|
||||||
import okhttp3.ResponseBody;
|
import okhttp3.ResponseBody;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
@ -39,13 +41,12 @@ import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.stream.io.ByteArrayInputStream;
|
|
||||||
import org.apache.nifi.util.StringUtils;
|
import org.apache.nifi.util.StringUtils;
|
||||||
import org.codehaus.jackson.JsonNode;
|
import org.codehaus.jackson.JsonNode;
|
||||||
import org.codehaus.jackson.node.ArrayNode;
|
import org.codehaus.jackson.node.ArrayNode;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.MalformedURLException;
|
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -54,6 +55,7 @@ import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
|
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
|
||||||
|
@ -65,6 +67,11 @@ import static org.apache.commons.lang3.StringUtils.trimToEmpty;
|
||||||
@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"})
|
@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"})
|
||||||
@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as "
|
@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as "
|
||||||
+ "the index to insert into and the type of the document.")
|
+ "the index to insert into and the type of the document.")
|
||||||
|
@DynamicProperty(
|
||||||
|
name = "A URL query parameter",
|
||||||
|
value = "The value to set it to",
|
||||||
|
supportsExpressionLanguage = true,
|
||||||
|
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing")
|
||||||
public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
|
|
||||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
||||||
|
@ -223,14 +230,18 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
|
|
||||||
final StringBuilder sb = new StringBuilder();
|
final StringBuilder sb = new StringBuilder();
|
||||||
final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
|
final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
|
||||||
final URL url;
|
HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder().addPathSegment("_bulk");
|
||||||
try {
|
|
||||||
url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + "_bulk");
|
// Find the user-added properties and set them as query parameters on the URL
|
||||||
} catch (MalformedURLException mue) {
|
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
|
||||||
// Since we have a URL validator, something has gone very wrong, throw a ProcessException
|
PropertyDescriptor pd = property.getKey();
|
||||||
context.yield();
|
if (pd.isDynamic()) {
|
||||||
throw new ProcessException(mue);
|
if (property.getValue() != null) {
|
||||||
|
urlBuilder = urlBuilder.addQueryParameter(pd.getName(), context.getProperty(pd).evaluateAttributeExpressions().getValue());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
final URL url = urlBuilder.build().url();
|
||||||
|
|
||||||
for (FlowFile file : flowFiles) {
|
for (FlowFile file : flowFiles) {
|
||||||
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();
|
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();
|
||||||
|
|
|
@ -16,11 +16,13 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.elasticsearch;
|
package org.apache.nifi.processors.elasticsearch;
|
||||||
|
|
||||||
|
import okhttp3.HttpUrl;
|
||||||
import okhttp3.MediaType;
|
import okhttp3.MediaType;
|
||||||
import okhttp3.OkHttpClient;
|
import okhttp3.OkHttpClient;
|
||||||
import okhttp3.RequestBody;
|
import okhttp3.RequestBody;
|
||||||
import okhttp3.Response;
|
import okhttp3.Response;
|
||||||
import okhttp3.ResponseBody;
|
import okhttp3.ResponseBody;
|
||||||
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
@ -66,7 +68,6 @@ import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.math.BigInteger;
|
import java.math.BigInteger;
|
||||||
import java.net.MalformedURLException;
|
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -87,6 +88,11 @@ import static org.apache.commons.lang3.StringUtils.trimToEmpty;
|
||||||
+ "the index to insert into and the type of the document, as well as the operation type (index, upsert, delete, etc.). Note: The Bulk API is used to "
|
+ "the index to insert into and the type of the document, as well as the operation type (index, upsert, delete, etc.). Note: The Bulk API is used to "
|
||||||
+ "send the records. This means that the entire contents of the incoming flow file are read into memory, and each record is transformed into a JSON document "
|
+ "send the records. This means that the entire contents of the incoming flow file are read into memory, and each record is transformed into a JSON document "
|
||||||
+ "which is added to a single HTTP request body. For very large flow files (files with a large number of records, e.g.), this could cause memory usage issues.")
|
+ "which is added to a single HTTP request body. For very large flow files (files with a large number of records, e.g.), this could cause memory usage issues.")
|
||||||
|
@DynamicProperty(
|
||||||
|
name = "A URL query parameter",
|
||||||
|
value = "The value to set it to",
|
||||||
|
supportsExpressionLanguage = true,
|
||||||
|
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing")
|
||||||
public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcessor {
|
public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcessor {
|
||||||
|
|
||||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
||||||
|
@ -239,14 +245,18 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
|
||||||
final ComponentLog logger = getLogger();
|
final ComponentLog logger = getLogger();
|
||||||
|
|
||||||
final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
|
final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
|
||||||
final URL url;
|
HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder().addPathSegment("_bulk");
|
||||||
try {
|
|
||||||
url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + "_bulk");
|
// Find the user-added properties and set them as query parameters on the URL
|
||||||
} catch (MalformedURLException mue) {
|
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
|
||||||
// Since we have a URL validator, something has gone very wrong, throw a ProcessException
|
PropertyDescriptor pd = property.getKey();
|
||||||
context.yield();
|
if (pd.isDynamic()) {
|
||||||
throw new ProcessException(mue);
|
if (property.getValue() != null) {
|
||||||
|
urlBuilder = urlBuilder.addQueryParameter(pd.getName(), context.getProperty(pd).evaluateAttributeExpressions().getValue());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
final URL url = urlBuilder.build().url();
|
||||||
|
|
||||||
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
|
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
if (StringUtils.isEmpty(index)) {
|
if (StringUtils.isEmpty(index)) {
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.elasticsearch;
|
package org.apache.nifi.processors.elasticsearch;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
@ -33,6 +34,7 @@ import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
|
@ -49,7 +51,6 @@ import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.stream.io.ByteArrayInputStream;
|
|
||||||
import org.codehaus.jackson.JsonNode;
|
import org.codehaus.jackson.JsonNode;
|
||||||
|
|
||||||
import okhttp3.HttpUrl;
|
import okhttp3.HttpUrl;
|
||||||
|
@ -73,13 +74,14 @@ import okhttp3.ResponseBody;
|
||||||
@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"),
|
@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"),
|
||||||
@WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of "
|
@WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of "
|
||||||
+ "each result will be placed into corresponding attributes with this prefix.") })
|
+ "each result will be placed into corresponding attributes with this prefix.") })
|
||||||
|
@DynamicProperty(
|
||||||
|
name = "A URL query parameter",
|
||||||
|
value = "The value to set it to",
|
||||||
|
supportsExpressionLanguage = true,
|
||||||
|
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing")
|
||||||
public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
|
|
||||||
private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
|
|
||||||
private static final String QUERY_QUERY_PARAM = "q";
|
|
||||||
private static final String SORT_QUERY_PARAM = "sort";
|
|
||||||
private static final String FROM_QUERY_PARAM = "from";
|
private static final String FROM_QUERY_PARAM = "from";
|
||||||
private static final String SIZE_QUERY_PARAM = "size";
|
|
||||||
|
|
||||||
public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content";
|
public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content";
|
||||||
public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes";
|
public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes";
|
||||||
|
@ -281,7 +283,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
|
final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
|
||||||
mPageSize, fromIndex);
|
mPageSize, fromIndex, context);
|
||||||
|
|
||||||
final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
|
final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
|
||||||
username, password, "GET", null);
|
username, password, "GET", null);
|
||||||
|
@ -403,7 +405,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
|
private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
|
||||||
String sort, int pageSize, int fromIndex) throws MalformedURLException {
|
String sort, int pageSize, int fromIndex, ProcessContext context) throws MalformedURLException {
|
||||||
if (StringUtils.isEmpty(baseUrl)) {
|
if (StringUtils.isEmpty(baseUrl)) {
|
||||||
throw new MalformedURLException("Base URL cannot be null");
|
throw new MalformedURLException("Base URL cannot be null");
|
||||||
}
|
}
|
||||||
|
@ -425,6 +427,16 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
builder.addQueryParameter(SORT_QUERY_PARAM, trimmedFields);
|
builder.addQueryParameter(SORT_QUERY_PARAM, trimmedFields);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Find the user-added properties and set them as query parameters on the URL
|
||||||
|
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
|
||||||
|
PropertyDescriptor pd = property.getKey();
|
||||||
|
if (pd.isDynamic()) {
|
||||||
|
if (property.getValue() != null) {
|
||||||
|
builder.addQueryParameter(pd.getName(), context.getProperty(pd).evaluateAttributeExpressions().getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return builder.build().url();
|
return builder.build().url();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.processors.elasticsearch;
|
package org.apache.nifi.processors.elasticsearch;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
@ -32,6 +33,7 @@ import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.Stateful;
|
import org.apache.nifi.annotation.behavior.Stateful;
|
||||||
|
@ -52,7 +54,6 @@ import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.stream.io.ByteArrayInputStream;
|
|
||||||
import org.codehaus.jackson.JsonNode;
|
import org.codehaus.jackson.JsonNode;
|
||||||
|
|
||||||
import okhttp3.HttpUrl;
|
import okhttp3.HttpUrl;
|
||||||
|
@ -73,18 +74,19 @@ import okhttp3.ResponseBody;
|
||||||
@WritesAttributes({
|
@WritesAttributes({
|
||||||
@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
|
@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
|
||||||
@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") })
|
@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") })
|
||||||
|
@DynamicProperty(
|
||||||
|
name = "A URL query parameter",
|
||||||
|
value = "The value to set it to",
|
||||||
|
supportsExpressionLanguage = true,
|
||||||
|
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing")
|
||||||
@Stateful(description = "After each successful scroll page, the latest scroll_id is persisted in scrollId as input for the next scroll call. "
|
@Stateful(description = "After each successful scroll page, the latest scroll_id is persisted in scrollId as input for the next scroll call. "
|
||||||
+ "Once the entire query is complete, finishedQuery state will be set to true, and the processor will not execute unless this is cleared.", scopes = { Scope.LOCAL })
|
+ "Once the entire query is complete, finishedQuery state will be set to true, and the processor will not execute unless this is cleared.", scopes = { Scope.LOCAL })
|
||||||
public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
|
|
||||||
private static final String FINISHED_QUERY_STATE = "finishedQuery";
|
private static final String FINISHED_QUERY_STATE = "finishedQuery";
|
||||||
private static final String SCROLL_ID_STATE = "scrollId";
|
private static final String SCROLL_ID_STATE = "scrollId";
|
||||||
private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
|
|
||||||
private static final String QUERY_QUERY_PARAM = "q";
|
|
||||||
private static final String SORT_QUERY_PARAM = "sort";
|
|
||||||
private static final String SCROLL_QUERY_PARAM = "scroll";
|
private static final String SCROLL_QUERY_PARAM = "scroll";
|
||||||
private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
|
private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
|
||||||
private static final String SIZE_QUERY_PARAM = "size";
|
|
||||||
|
|
||||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||||
.name("success")
|
.name("success")
|
||||||
|
@ -249,7 +251,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
|
||||||
.getValue());
|
.getValue());
|
||||||
if (scrollId != null) {
|
if (scrollId != null) {
|
||||||
final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
|
final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
|
||||||
scrollId, pageSize, scroll);
|
scrollId, pageSize, scroll, context);
|
||||||
final long startNanos = System.nanoTime();
|
final long startNanos = System.nanoTime();
|
||||||
|
|
||||||
final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl,
|
final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl,
|
||||||
|
@ -262,7 +264,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
|
||||||
|
|
||||||
// read the url property from the context
|
// read the url property from the context
|
||||||
final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
|
final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
|
||||||
scrollId, pageSize, scroll);
|
scrollId, pageSize, scroll, context);
|
||||||
final long startNanos = System.nanoTime();
|
final long startNanos = System.nanoTime();
|
||||||
|
|
||||||
final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
|
final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
|
||||||
|
@ -399,7 +401,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
|
||||||
}
|
}
|
||||||
|
|
||||||
private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
|
private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
|
||||||
String sort, String scrollId, int pageSize, String scroll) throws MalformedURLException {
|
String sort, String scrollId, int pageSize, String scroll, ProcessContext context) throws MalformedURLException {
|
||||||
if (StringUtils.isEmpty(baseUrl)) {
|
if (StringUtils.isEmpty(baseUrl)) {
|
||||||
throw new MalformedURLException("Base URL cannot be null");
|
throw new MalformedURLException("Base URL cannot be null");
|
||||||
}
|
}
|
||||||
|
@ -427,6 +429,17 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
|
||||||
}
|
}
|
||||||
builder.addQueryParameter(SCROLL_QUERY_PARAM, scroll);
|
builder.addQueryParameter(SCROLL_QUERY_PARAM, scroll);
|
||||||
|
|
||||||
|
// Find the user-added properties and set them as query parameters on the URL
|
||||||
|
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {
|
||||||
|
PropertyDescriptor pd = property.getKey();
|
||||||
|
if (pd.isDynamic()) {
|
||||||
|
if (property.getValue() != null) {
|
||||||
|
builder.addQueryParameter(pd.getName(), context.getProperty(pd).evaluateAttributeExpressions().getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
return builder.build().url();
|
return builder.build().url();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,6 +44,7 @@ import java.util.HashMap;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -293,6 +294,33 @@ public class TestFetchElasticsearchHttp {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFetchElasticsearchOnTriggerQueryParameter() throws IOException {
|
||||||
|
FetchElasticsearchHttpTestProcessor p = new FetchElasticsearchHttpTestProcessor(true); // all docs are found
|
||||||
|
p.setExpectedUrl("http://127.0.0.1:9200/doc/status/28039652140?_source_include=id&myparam=myvalue");
|
||||||
|
runner = TestRunners.newTestRunner(p);
|
||||||
|
runner.setValidateExpressionUsage(true);
|
||||||
|
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
|
||||||
|
|
||||||
|
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
|
||||||
|
runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
|
||||||
|
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
|
||||||
|
runner.setProperty(FetchElasticsearchHttp.FIELDS, "id");
|
||||||
|
|
||||||
|
// Set dynamic property, to be added to the URL as a query parameter
|
||||||
|
runner.setProperty("myparam", "myvalue");
|
||||||
|
|
||||||
|
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||||
|
put("doc_id", "28039652140");
|
||||||
|
}});
|
||||||
|
runner.run(1, true, true);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
|
||||||
|
final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0);
|
||||||
|
assertNotNull(out);
|
||||||
|
out.assertAttributeEquals("doc_id", "28039652140");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Test class that extends the processor in order to inject/mock behavior
|
* A Test class that extends the processor in order to inject/mock behavior
|
||||||
*/
|
*/
|
||||||
|
@ -302,8 +330,8 @@ public class TestFetchElasticsearchHttp {
|
||||||
OkHttpClient client;
|
OkHttpClient client;
|
||||||
int statusCode = 200;
|
int statusCode = 200;
|
||||||
String statusMessage = "OK";
|
String statusMessage = "OK";
|
||||||
|
|
||||||
URL url = null;
|
URL url = null;
|
||||||
|
String expectedUrl = null;
|
||||||
|
|
||||||
FetchElasticsearchHttpTestProcessor(boolean documentExists) {
|
FetchElasticsearchHttpTestProcessor(boolean documentExists) {
|
||||||
this.documentExists = documentExists;
|
this.documentExists = documentExists;
|
||||||
|
@ -318,6 +346,10 @@ public class TestFetchElasticsearchHttp {
|
||||||
statusMessage = message;
|
statusMessage = message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setExpectedUrl(String url) {
|
||||||
|
expectedUrl = url;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
|
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
|
||||||
client = mock(OkHttpClient.class);
|
client = mock(OkHttpClient.class);
|
||||||
|
@ -327,6 +359,7 @@ public class TestFetchElasticsearchHttp {
|
||||||
@Override
|
@Override
|
||||||
public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
|
public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||||
Request realRequest = (Request) invocationOnMock.getArguments()[0];
|
Request realRequest = (Request) invocationOnMock.getArguments()[0];
|
||||||
|
assertTrue((expectedUrl == null) || (expectedUrl.equals(realRequest.url().toString())));
|
||||||
StringBuilder sb = new StringBuilder("{\"_index\":\"randomuser.me\",\"_type\":\"user\",\"_id\":\"0\",\"_version\":2,");
|
StringBuilder sb = new StringBuilder("{\"_index\":\"randomuser.me\",\"_type\":\"user\",\"_id\":\"0\",\"_version\":2,");
|
||||||
if (documentExists) {
|
if (documentExists) {
|
||||||
sb.append("\"found\":true,\"_source\":{\"gender\":\"female\",\"name\":{\"title\":\"Ms\",\"first\":\"Joan\",\"last\":\"Smith\"}}");
|
sb.append("\"found\":true,\"_source\":{\"gender\":\"female\",\"name\":{\"title\":\"Ms\",\"first\":\"Joan\",\"last\":\"Smith\"}}");
|
||||||
|
|
|
@ -38,9 +38,11 @@ import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -53,7 +55,7 @@ public class TestPutElasticsearchHttp {
|
||||||
@Before
|
@Before
|
||||||
public void once() throws IOException {
|
public void once() throws IOException {
|
||||||
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
|
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
|
||||||
docExample = IOUtils.toString(classloader.getResourceAsStream("DocumentExample.json")).getBytes();
|
docExample = IOUtils.toString(classloader.getResourceAsStream("DocumentExample.json"), StandardCharsets.UTF_8).getBytes();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -322,6 +324,32 @@ public class TestPutElasticsearchHttp {
|
||||||
assertNotNull(out);
|
assertNotNull(out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutElasticSearchOnTriggerQueryParameter() throws IOException {
|
||||||
|
PutElasticsearchTestProcessor p = new PutElasticsearchTestProcessor(false); // no failures
|
||||||
|
p.setExpectedUrl("http://127.0.0.1:9200/_bulk?pipeline=my-pipeline");
|
||||||
|
|
||||||
|
runner = TestRunners.newTestRunner(p);
|
||||||
|
runner.setValidateExpressionUsage(true);
|
||||||
|
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
|
||||||
|
runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
|
||||||
|
runner.setProperty(PutElasticsearchHttp.TYPE, "status");
|
||||||
|
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
|
||||||
|
runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
|
||||||
|
|
||||||
|
// Set dynamic property, to be added to the URL as a query parameter
|
||||||
|
runner.setProperty("pipeline", "my-pipeline");
|
||||||
|
|
||||||
|
runner.enqueue(docExample, new HashMap<String, String>() {{
|
||||||
|
put("doc_id", "28039652140");
|
||||||
|
}});
|
||||||
|
runner.run(1, true, true);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1);
|
||||||
|
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0);
|
||||||
|
assertNotNull(out);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Test class that extends the processor in order to inject/mock behavior
|
* A Test class that extends the processor in order to inject/mock behavior
|
||||||
*/
|
*/
|
||||||
|
@ -330,6 +358,7 @@ public class TestPutElasticsearchHttp {
|
||||||
OkHttpClient client;
|
OkHttpClient client;
|
||||||
int statusCode = 200;
|
int statusCode = 200;
|
||||||
String statusMessage = "OK";
|
String statusMessage = "OK";
|
||||||
|
String expectedUrl = null;
|
||||||
|
|
||||||
PutElasticsearchTestProcessor(boolean responseHasFailures) {
|
PutElasticsearchTestProcessor(boolean responseHasFailures) {
|
||||||
this.responseHasFailures = responseHasFailures;
|
this.responseHasFailures = responseHasFailures;
|
||||||
|
@ -340,6 +369,10 @@ public class TestPutElasticsearchHttp {
|
||||||
statusMessage = message;
|
statusMessage = message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setExpectedUrl(String url) {
|
||||||
|
expectedUrl = url;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
|
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
|
||||||
client = mock(OkHttpClient.class);
|
client = mock(OkHttpClient.class);
|
||||||
|
@ -351,6 +384,7 @@ public class TestPutElasticsearchHttp {
|
||||||
final Call call = mock(Call.class);
|
final Call call = mock(Call.class);
|
||||||
if (statusCode != -1) {
|
if (statusCode != -1) {
|
||||||
Request realRequest = (Request) invocationOnMock.getArguments()[0];
|
Request realRequest = (Request) invocationOnMock.getArguments()[0];
|
||||||
|
assertTrue((expectedUrl == null) || (expectedUrl.equals(realRequest.url().toString())));
|
||||||
StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
|
StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
|
||||||
sb.append(responseHasFailures);
|
sb.append(responseHasFailures);
|
||||||
sb.append("\", \"items\": [");
|
sb.append("\", \"items\": [");
|
||||||
|
|
|
@ -44,6 +44,7 @@ import java.util.List;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -332,6 +333,37 @@ public class TestPutElasticsearchHttpRecord {
|
||||||
assertNotNull(out);
|
assertNotNull(out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutElasticSearchOnTriggerQueryParameter() throws IOException {
|
||||||
|
PutElasticsearchHttpRecordTestProcessor p = new PutElasticsearchHttpRecordTestProcessor(false); // no failures
|
||||||
|
p.setExpectedUrl("http://127.0.0.1:9200/_bulk?pipeline=my-pipeline");
|
||||||
|
runner = TestRunners.newTestRunner(p);
|
||||||
|
generateTestData();
|
||||||
|
runner.setValidateExpressionUsage(true);
|
||||||
|
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
|
||||||
|
|
||||||
|
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
|
||||||
|
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
|
||||||
|
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
|
||||||
|
|
||||||
|
// Set dynamic property, to be added to the URL as a query parameter
|
||||||
|
runner.setProperty("pipeline", "my-pipeline");
|
||||||
|
|
||||||
|
runner.enqueue(new byte[0], new HashMap<String, String>() {{
|
||||||
|
put("doc_id", "28039652140");
|
||||||
|
}});
|
||||||
|
runner.run(1, true, true);
|
||||||
|
|
||||||
|
runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1);
|
||||||
|
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0);
|
||||||
|
assertNotNull(out);
|
||||||
|
out.assertAttributeEquals("doc_id", "28039652140");
|
||||||
|
List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
|
||||||
|
assertNotNull(provEvents);
|
||||||
|
assertEquals(1, provEvents.size());
|
||||||
|
assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Test class that extends the processor in order to inject/mock behavior
|
* A Test class that extends the processor in order to inject/mock behavior
|
||||||
*/
|
*/
|
||||||
|
@ -340,6 +372,7 @@ public class TestPutElasticsearchHttpRecord {
|
||||||
OkHttpClient client;
|
OkHttpClient client;
|
||||||
int statusCode = 200;
|
int statusCode = 200;
|
||||||
String statusMessage = "OK";
|
String statusMessage = "OK";
|
||||||
|
String expectedUrl = null;
|
||||||
|
|
||||||
PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
|
PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) {
|
||||||
this.responseHasFailures = responseHasFailures;
|
this.responseHasFailures = responseHasFailures;
|
||||||
|
@ -350,6 +383,10 @@ public class TestPutElasticsearchHttpRecord {
|
||||||
statusMessage = message;
|
statusMessage = message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setExpectedUrl(String url) {
|
||||||
|
expectedUrl = url;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
|
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
|
||||||
client = mock(OkHttpClient.class);
|
client = mock(OkHttpClient.class);
|
||||||
|
@ -358,6 +395,7 @@ public class TestPutElasticsearchHttpRecord {
|
||||||
final Call call = mock(Call.class);
|
final Call call = mock(Call.class);
|
||||||
if (statusCode != -1) {
|
if (statusCode != -1) {
|
||||||
Request realRequest = (Request) invocationOnMock.getArguments()[0];
|
Request realRequest = (Request) invocationOnMock.getArguments()[0];
|
||||||
|
assertTrue((expectedUrl == null) || (expectedUrl.equals(realRequest.url().toString())));
|
||||||
StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
|
StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \"");
|
||||||
sb.append(responseHasFailures);
|
sb.append(responseHasFailures);
|
||||||
sb.append("\", \"items\": [");
|
sb.append("\", \"items\": [");
|
||||||
|
|
|
@ -18,11 +18,13 @@ package org.apache.nifi.processors.elasticsearch;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -360,6 +362,22 @@ public class TestQueryElasticsearchHttp {
|
||||||
runner.run(1, true, true);
|
runner.run(1, true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueryElasticsearchOnTrigger_withQueryParameters() throws IOException {
|
||||||
|
QueryElasticsearchHttpTestProcessor p = new QueryElasticsearchHttpTestProcessor();
|
||||||
|
p.setExpectedParam("myparam=myvalue");
|
||||||
|
runner = TestRunners.newTestRunner(p);
|
||||||
|
runner.setValidateExpressionUsage(true);
|
||||||
|
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
|
||||||
|
|
||||||
|
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
|
||||||
|
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
|
||||||
|
runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
|
||||||
|
// Set dynamic property, to be added to the URL as a query parameter
|
||||||
|
runner.setProperty("myparam", "myvalue");
|
||||||
|
runAndVerifySuccess(true);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Test class that extends the processor in order to inject/mock behavior
|
* A Test class that extends the processor in order to inject/mock behavior
|
||||||
*/
|
*/
|
||||||
|
@ -376,6 +394,8 @@ public class TestQueryElasticsearchHttp {
|
||||||
List<String> pages = Arrays.asList(getDoc("query-page1.json"), getDoc("query-page2.json"),
|
List<String> pages = Arrays.asList(getDoc("query-page1.json"), getDoc("query-page2.json"),
|
||||||
getDoc("query-page3.json"));
|
getDoc("query-page3.json"));
|
||||||
|
|
||||||
|
String expectedParam = null;
|
||||||
|
|
||||||
public void setExceptionToThrow(Exception exceptionToThrow) {
|
public void setExceptionToThrow(Exception exceptionToThrow) {
|
||||||
this.exceptionToThrow = exceptionToThrow;
|
this.exceptionToThrow = exceptionToThrow;
|
||||||
}
|
}
|
||||||
|
@ -392,6 +412,16 @@ public class TestQueryElasticsearchHttp {
|
||||||
this.setStatus(code, message, 1);
|
this.setStatus(code, message, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets an query parameter (name=value) expected to be at the end of the URL for the query operation
|
||||||
|
*
|
||||||
|
* @param param
|
||||||
|
* The parameter to expect
|
||||||
|
*/
|
||||||
|
void setExpectedParam(String param) {
|
||||||
|
expectedParam = param;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the status code and message for the runNumber-th query
|
* Sets the status code and message for the runNumber-th query
|
||||||
*
|
*
|
||||||
|
@ -431,6 +461,7 @@ public class TestQueryElasticsearchHttp {
|
||||||
@Override
|
@Override
|
||||||
public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
|
public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||||
Request realRequest = (Request) invocationOnMock.getArguments()[0];
|
Request realRequest = (Request) invocationOnMock.getArguments()[0];
|
||||||
|
assertTrue((expectedParam == null) || (realRequest.url().toString().endsWith(expectedParam)));
|
||||||
Response mockResponse = new Response.Builder()
|
Response mockResponse = new Response.Builder()
|
||||||
.request(realRequest)
|
.request(realRequest)
|
||||||
.protocol(Protocol.HTTP_1_1)
|
.protocol(Protocol.HTTP_1_1)
|
||||||
|
@ -456,8 +487,7 @@ public class TestQueryElasticsearchHttp {
|
||||||
|
|
||||||
private static String getDoc(String filename) {
|
private static String getDoc(String filename) {
|
||||||
try {
|
try {
|
||||||
return IOUtils.toString(QueryElasticsearchHttp.class.getClassLoader()
|
return IOUtils.toString(QueryElasticsearchHttp.class.getClassLoader().getResourceAsStream(filename), StandardCharsets.UTF_8);
|
||||||
.getResourceAsStream(filename));
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
System.out.println("Error reading document " + filename);
|
System.out.println("Error reading document " + filename);
|
||||||
return "";
|
return "";
|
||||||
|
|
|
@ -17,11 +17,13 @@
|
||||||
package org.apache.nifi.processors.elasticsearch;
|
package org.apache.nifi.processors.elasticsearch;
|
||||||
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -316,6 +318,24 @@ public class TestScrollElasticsearchHttp {
|
||||||
runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 1);
|
runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScrollElasticsearchOnTrigger_withQueryParameter() throws IOException {
|
||||||
|
ScrollElasticsearchHttpTestProcessor p = new ScrollElasticsearchHttpTestProcessor();
|
||||||
|
p.setExpectedParam("myparam=myvalue");
|
||||||
|
runner = TestRunners.newTestRunner(p);
|
||||||
|
runner.setValidateExpressionUsage(true);
|
||||||
|
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
|
||||||
|
|
||||||
|
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
|
||||||
|
runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
|
||||||
|
runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:WZ");
|
||||||
|
runner.setProperty(ScrollElasticsearchHttp.PAGE_SIZE, "2");
|
||||||
|
// Set dynamic property, to be added to the URL as a query parameter
|
||||||
|
runner.setProperty("myparam", "myvalue");
|
||||||
|
runner.setIncomingConnection(false);
|
||||||
|
runAndVerifySuccess();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Test class that extends the processor in order to inject/mock behavior
|
* A Test class that extends the processor in order to inject/mock behavior
|
||||||
*/
|
*/
|
||||||
|
@ -332,6 +352,8 @@ public class TestScrollElasticsearchHttp {
|
||||||
List<String> pages = Arrays.asList(getDoc("scroll-page1.json"),
|
List<String> pages = Arrays.asList(getDoc("scroll-page1.json"),
|
||||||
getDoc("scroll-page2.json"), getDoc("scroll-page3.json"));
|
getDoc("scroll-page2.json"), getDoc("scroll-page3.json"));
|
||||||
|
|
||||||
|
String expectedParam = null;
|
||||||
|
|
||||||
public void setExceptionToThrow(Exception exceptionToThrow) {
|
public void setExceptionToThrow(Exception exceptionToThrow) {
|
||||||
this.exceptionToThrow = exceptionToThrow;
|
this.exceptionToThrow = exceptionToThrow;
|
||||||
}
|
}
|
||||||
|
@ -364,6 +386,16 @@ public class TestScrollElasticsearchHttp {
|
||||||
this.runNumber = runNumber;
|
this.runNumber = runNumber;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets an query parameter (name=value) expected to be at the end of the URL for the query operation
|
||||||
|
*
|
||||||
|
* @param param
|
||||||
|
* The parameter to expect
|
||||||
|
*/
|
||||||
|
void setExpectedParam(String param) {
|
||||||
|
expectedParam = param;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
|
protected void createElasticsearchClient(ProcessContext context) throws ProcessException {
|
||||||
client = mock(OkHttpClient.class);
|
client = mock(OkHttpClient.class);
|
||||||
|
@ -387,6 +419,7 @@ public class TestScrollElasticsearchHttp {
|
||||||
@Override
|
@Override
|
||||||
public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
|
public Call answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||||
Request realRequest = (Request) invocationOnMock.getArguments()[0];
|
Request realRequest = (Request) invocationOnMock.getArguments()[0];
|
||||||
|
assertTrue((expectedParam == null) || (realRequest.url().toString().endsWith(expectedParam)));
|
||||||
Response mockResponse = new Response.Builder()
|
Response mockResponse = new Response.Builder()
|
||||||
.request(realRequest)
|
.request(realRequest)
|
||||||
.protocol(Protocol.HTTP_1_1)
|
.protocol(Protocol.HTTP_1_1)
|
||||||
|
@ -412,8 +445,7 @@ public class TestScrollElasticsearchHttp {
|
||||||
|
|
||||||
private static String getDoc(String filename) {
|
private static String getDoc(String filename) {
|
||||||
try {
|
try {
|
||||||
return IOUtils.toString(ScrollElasticsearchHttp.class.getClassLoader()
|
return IOUtils.toString(ScrollElasticsearchHttp.class.getClassLoader().getResourceAsStream(filename), StandardCharsets.UTF_8);
|
||||||
.getResourceAsStream(filename));
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
System.out.println("Error reading document " + filename);
|
System.out.println("Error reading document " + filename);
|
||||||
return "";
|
return "";
|
||||||
|
|
Loading…
Reference in New Issue