diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java index d67ce6c540..df986b1937 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java @@ -50,6 +50,12 @@ import java.util.concurrent.atomic.AtomicReference; */ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor { + static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; + static final String QUERY_QUERY_PARAM = "q"; + static final String SORT_QUERY_PARAM = "sort"; + static final String SIZE_QUERY_PARAM = "size"; + + public static final PropertyDescriptor ES_URL = new PropertyDescriptor.Builder() .name("elasticsearch-http-url") .displayName("Elasticsearch URL") @@ -97,6 +103,17 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic private final AtomicReference okHttpClientAtomicReference = new AtomicReference<>(); + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + @Override protected void createElasticsearchClient(ProcessContext context) throws ProcessException { okHttpClientAtomicReference.set(null); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java index 06be4d5456..441cc109a2 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java @@ -21,6 +21,7 @@ import okhttp3.OkHttpClient; import okhttp3.Response; import okhttp3.ResponseBody; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -37,9 +38,9 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.stream.io.ByteArrayInputStream; import org.codehaus.jackson.JsonNode; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -47,6 +48,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -65,10 +67,13 @@ import java.util.stream.Stream; @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") }) +@DynamicProperty( + name = "A URL query parameter", + value = "The value to set it to", + supportsExpressionLanguage = true, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing") public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { - private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; - public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("All FlowFiles that are read from Elasticsearch are routed to this relationship.") @@ -212,7 +217,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { // read the url property from the context final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue()); - final URL url = buildRequestURL(urlstr, docId, index, docType, fields); + final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context); final long startNanos = System.nanoTime(); getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null); @@ -304,7 +309,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { } } - private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields) throws MalformedURLException { + private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context) throws MalformedURLException { if (StringUtils.isEmpty(baseUrl)) { throw new MalformedURLException("Base URL cannot be null"); } @@ -317,6 +322,16 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields); } + // Find the user-added properties and set them as query parameters on the URL + for (Map.Entry property : context.getProperties().entrySet()) { + PropertyDescriptor pd = property.getKey(); + if (pd.isDynamic()) { + if (property.getValue() != null) { + builder.addQueryParameter(pd.getName(), context.getProperty(pd).evaluateAttributeExpressions().getValue()); + } + } + } + return builder.build().url(); } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index 70884be002..52892a06f5 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -16,12 +16,14 @@ */ package org.apache.nifi.processors.elasticsearch; +import okhttp3.HttpUrl; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.ResponseBody; import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -39,13 +41,12 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.stream.io.ByteArrayInputStream; import org.apache.nifi.util.StringUtils; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.node.ArrayNode; +import java.io.ByteArrayInputStream; import java.io.IOException; -import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.Charset; import java.util.ArrayList; @@ -54,6 +55,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import static org.apache.commons.lang3.StringUtils.trimToEmpty; @@ -65,6 +67,11 @@ import static org.apache.commons.lang3.StringUtils.trimToEmpty; @Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"}) @CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as " + "the index to insert into and the type of the document.") +@DynamicProperty( + name = "A URL query parameter", + value = "The value to set it to", + supportsExpressionLanguage = true, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing") public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") @@ -223,14 +230,18 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { final StringBuilder sb = new StringBuilder(); final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue()); - final URL url; - try { - url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + "_bulk"); - } catch (MalformedURLException mue) { - // Since we have a URL validator, something has gone very wrong, throw a ProcessException - context.yield(); - throw new ProcessException(mue); + HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder().addPathSegment("_bulk"); + + // Find the user-added properties and set them as query parameters on the URL + for (Map.Entry property : context.getProperties().entrySet()) { + PropertyDescriptor pd = property.getKey(); + if (pd.isDynamic()) { + if (property.getValue() != null) { + urlBuilder = urlBuilder.addQueryParameter(pd.getName(), context.getProperty(pd).evaluateAttributeExpressions().getValue()); + } + } } + final URL url = urlBuilder.build().url(); for (FlowFile file : flowFiles) { final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue(); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java index 24d0057590..c618f6c6ce 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java @@ -16,11 +16,13 @@ */ package org.apache.nifi.processors.elasticsearch; +import okhttp3.HttpUrl; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.ResponseBody; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -66,7 +68,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.math.BigInteger; -import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.Collection; @@ -87,6 +88,11 @@ import static org.apache.commons.lang3.StringUtils.trimToEmpty; + "the index to insert into and the type of the document, as well as the operation type (index, upsert, delete, etc.). Note: The Bulk API is used to " + "send the records. This means that the entire contents of the incoming flow file are read into memory, and each record is transformed into a JSON document " + "which is added to a single HTTP request body. For very large flow files (files with a large number of records, e.g.), this could cause memory usage issues.") +@DynamicProperty( + name = "A URL query parameter", + value = "The value to set it to", + supportsExpressionLanguage = true, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing") public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcessor { public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") @@ -239,14 +245,18 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess final ComponentLog logger = getLogger(); final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue()); - final URL url; - try { - url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + "_bulk"); - } catch (MalformedURLException mue) { - // Since we have a URL validator, something has gone very wrong, throw a ProcessException - context.yield(); - throw new ProcessException(mue); + HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl).newBuilder().addPathSegment("_bulk"); + + // Find the user-added properties and set them as query parameters on the URL + for (Map.Entry property : context.getProperties().entrySet()) { + PropertyDescriptor pd = property.getKey(); + if (pd.isDynamic()) { + if (property.getValue() != null) { + urlBuilder = urlBuilder.addQueryParameter(pd.getName(), context.getProperty(pd).evaluateAttributeExpressions().getValue()); + } + } } + final URL url = urlBuilder.build().url(); final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue(); if (StringUtils.isEmpty(index)) { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java index f65816e125..b25f1738d4 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.elasticsearch; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -33,6 +34,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -49,7 +51,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.stream.io.ByteArrayInputStream; import org.codehaus.jackson.JsonNode; import okhttp3.HttpUrl; @@ -73,13 +74,14 @@ import okhttp3.ResponseBody; @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"), @WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of " + "each result will be placed into corresponding attributes with this prefix.") }) +@DynamicProperty( + name = "A URL query parameter", + value = "The value to set it to", + supportsExpressionLanguage = true, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing") public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { - private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; - private static final String QUERY_QUERY_PARAM = "q"; - private static final String SORT_QUERY_PARAM = "sort"; private static final String FROM_QUERY_PARAM = "from"; - private static final String SIZE_QUERY_PARAM = "size"; public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content"; public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes"; @@ -281,7 +283,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { } final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort, - mPageSize, fromIndex); + mPageSize, fromIndex, context); final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl, username, password, "GET", null); @@ -403,7 +405,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { } private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields, - String sort, int pageSize, int fromIndex) throws MalformedURLException { + String sort, int pageSize, int fromIndex, ProcessContext context) throws MalformedURLException { if (StringUtils.isEmpty(baseUrl)) { throw new MalformedURLException("Base URL cannot be null"); } @@ -425,6 +427,16 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { builder.addQueryParameter(SORT_QUERY_PARAM, trimmedFields); } + // Find the user-added properties and set them as query parameters on the URL + for (Map.Entry property : context.getProperties().entrySet()) { + PropertyDescriptor pd = property.getKey(); + if (pd.isDynamic()) { + if (property.getValue() != null) { + builder.addQueryParameter(pd.getName(), context.getProperty(pd).evaluateAttributeExpressions().getValue()); + } + } + } + return builder.build().url(); } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java index 0442bf7bf1..91b9176c31 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.elasticsearch; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -32,6 +33,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.Stateful; @@ -52,7 +54,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.stream.io.ByteArrayInputStream; import org.codehaus.jackson.JsonNode; import okhttp3.HttpUrl; @@ -73,18 +74,19 @@ import okhttp3.ResponseBody; @WritesAttributes({ @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") }) +@DynamicProperty( + name = "A URL query parameter", + value = "The value to set it to", + supportsExpressionLanguage = true, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing") @Stateful(description = "After each successful scroll page, the latest scroll_id is persisted in scrollId as input for the next scroll call. " + "Once the entire query is complete, finishedQuery state will be set to true, and the processor will not execute unless this is cleared.", scopes = { Scope.LOCAL }) public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor { private static final String FINISHED_QUERY_STATE = "finishedQuery"; private static final String SCROLL_ID_STATE = "scrollId"; - private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; - private static final String QUERY_QUERY_PARAM = "q"; - private static final String SORT_QUERY_PARAM = "sort"; private static final String SCROLL_QUERY_PARAM = "scroll"; private static final String SCROLL_ID_QUERY_PARAM = "scroll_id"; - private static final String SIZE_QUERY_PARAM = "size"; public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -249,7 +251,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor .getValue()); if (scrollId != null) { final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort, - scrollId, pageSize, scroll); + scrollId, pageSize, scroll, context); final long startNanos = System.nanoTime(); final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl, @@ -262,7 +264,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor // read the url property from the context final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort, - scrollId, pageSize, scroll); + scrollId, pageSize, scroll, context); final long startNanos = System.nanoTime(); final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl, @@ -399,7 +401,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor } private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields, - String sort, String scrollId, int pageSize, String scroll) throws MalformedURLException { + String sort, String scrollId, int pageSize, String scroll, ProcessContext context) throws MalformedURLException { if (StringUtils.isEmpty(baseUrl)) { throw new MalformedURLException("Base URL cannot be null"); } @@ -427,6 +429,17 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor } builder.addQueryParameter(SCROLL_QUERY_PARAM, scroll); + // Find the user-added properties and set them as query parameters on the URL + for (Map.Entry property : context.getProperties().entrySet()) { + PropertyDescriptor pd = property.getKey(); + if (pd.isDynamic()) { + if (property.getValue() != null) { + builder.addQueryParameter(pd.getName(), context.getProperty(pd).evaluateAttributeExpressions().getValue()); + } + } + } + + return builder.build().url(); } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java index 346ead499e..de56b49686 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java @@ -44,6 +44,7 @@ import java.util.HashMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -293,6 +294,33 @@ public class TestFetchElasticsearchHttp { } + @Test + public void testFetchElasticsearchOnTriggerQueryParameter() throws IOException { + FetchElasticsearchHttpTestProcessor p = new FetchElasticsearchHttpTestProcessor(true); // all docs are found + p.setExpectedUrl("http://127.0.0.1:9200/doc/status/28039652140?_source_include=id&myparam=myvalue"); + runner = TestRunners.newTestRunner(p); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + runner.setProperty(FetchElasticsearchHttp.FIELDS, "id"); + + // Set dynamic property, to be added to the URL as a query parameter + runner.setProperty("myparam", "myvalue"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + /** * A Test class that extends the processor in order to inject/mock behavior */ @@ -302,8 +330,8 @@ public class TestFetchElasticsearchHttp { OkHttpClient client; int statusCode = 200; String statusMessage = "OK"; - URL url = null; + String expectedUrl = null; FetchElasticsearchHttpTestProcessor(boolean documentExists) { this.documentExists = documentExists; @@ -318,6 +346,10 @@ public class TestFetchElasticsearchHttp { statusMessage = message; } + void setExpectedUrl(String url) { + expectedUrl = url; + } + @Override protected void createElasticsearchClient(ProcessContext context) throws ProcessException { client = mock(OkHttpClient.class); @@ -327,6 +359,7 @@ public class TestFetchElasticsearchHttp { @Override public Call answer(InvocationOnMock invocationOnMock) throws Throwable { Request realRequest = (Request) invocationOnMock.getArguments()[0]; + assertTrue((expectedUrl == null) || (expectedUrl.equals(realRequest.url().toString()))); StringBuilder sb = new StringBuilder("{\"_index\":\"randomuser.me\",\"_type\":\"user\",\"_id\":\"0\",\"_version\":2,"); if (documentExists) { sb.append("\"found\":true,\"_source\":{\"gender\":\"female\",\"name\":{\"title\":\"Ms\",\"first\":\"Joan\",\"last\":\"Smith\"}}"); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java index a8575d4bc2..36aa94e3e8 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java @@ -38,9 +38,11 @@ import org.mockito.stubbing.Answer; import java.io.IOException; import java.net.ConnectException; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -53,7 +55,7 @@ public class TestPutElasticsearchHttp { @Before public void once() throws IOException { ClassLoader classloader = Thread.currentThread().getContextClassLoader(); - docExample = IOUtils.toString(classloader.getResourceAsStream("DocumentExample.json")).getBytes(); + docExample = IOUtils.toString(classloader.getResourceAsStream("DocumentExample.json"), StandardCharsets.UTF_8).getBytes(); } @After @@ -322,6 +324,32 @@ public class TestPutElasticsearchHttp { assertNotNull(out); } + @Test + public void testPutElasticSearchOnTriggerQueryParameter() throws IOException { + PutElasticsearchTestProcessor p = new PutElasticsearchTestProcessor(false); // no failures + p.setExpectedUrl("http://127.0.0.1:9200/_bulk?pipeline=my-pipeline"); + + runner = TestRunners.newTestRunner(p); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + + // Set dynamic property, to be added to the URL as a query parameter + runner.setProperty("pipeline", "my-pipeline"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + } + /** * A Test class that extends the processor in order to inject/mock behavior */ @@ -330,6 +358,7 @@ public class TestPutElasticsearchHttp { OkHttpClient client; int statusCode = 200; String statusMessage = "OK"; + String expectedUrl = null; PutElasticsearchTestProcessor(boolean responseHasFailures) { this.responseHasFailures = responseHasFailures; @@ -340,6 +369,10 @@ public class TestPutElasticsearchHttp { statusMessage = message; } + void setExpectedUrl(String url) { + expectedUrl = url; + } + @Override protected void createElasticsearchClient(ProcessContext context) throws ProcessException { client = mock(OkHttpClient.class); @@ -351,6 +384,7 @@ public class TestPutElasticsearchHttp { final Call call = mock(Call.class); if (statusCode != -1) { Request realRequest = (Request) invocationOnMock.getArguments()[0]; + assertTrue((expectedUrl == null) || (expectedUrl.equals(realRequest.url().toString()))); StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \""); sb.append(responseHasFailures); sb.append("\", \"items\": ["); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java index e93123647b..75fb6ec8a6 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java @@ -44,6 +44,7 @@ import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -332,6 +333,37 @@ public class TestPutElasticsearchHttpRecord { assertNotNull(out); } + @Test + public void testPutElasticSearchOnTriggerQueryParameter() throws IOException { + PutElasticsearchHttpRecordTestProcessor p = new PutElasticsearchHttpRecordTestProcessor(false); // no failures + p.setExpectedUrl("http://127.0.0.1:9200/_bulk?pipeline=my-pipeline"); + runner = TestRunners.newTestRunner(p); + generateTestData(); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status"); + runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); + + // Set dynamic property, to be added to the URL as a query parameter + runner.setProperty("pipeline", "my-pipeline"); + + runner.enqueue(new byte[0], new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttpRecord.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttpRecord.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + List provEvents = runner.getProvenanceEvents(); + assertNotNull(provEvents); + assertEquals(1, provEvents.size()); + assertEquals(ProvenanceEventType.SEND, provEvents.get(0).getEventType()); + } + /** * A Test class that extends the processor in order to inject/mock behavior */ @@ -340,6 +372,7 @@ public class TestPutElasticsearchHttpRecord { OkHttpClient client; int statusCode = 200; String statusMessage = "OK"; + String expectedUrl = null; PutElasticsearchHttpRecordTestProcessor(boolean responseHasFailures) { this.responseHasFailures = responseHasFailures; @@ -350,6 +383,10 @@ public class TestPutElasticsearchHttpRecord { statusMessage = message; } + void setExpectedUrl(String url) { + expectedUrl = url; + } + @Override protected void createElasticsearchClient(ProcessContext context) throws ProcessException { client = mock(OkHttpClient.class); @@ -358,6 +395,7 @@ public class TestPutElasticsearchHttpRecord { final Call call = mock(Call.class); if (statusCode != -1) { Request realRequest = (Request) invocationOnMock.getArguments()[0]; + assertTrue((expectedUrl == null) || (expectedUrl.equals(realRequest.url().toString()))); StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \""); sb.append(responseHasFailures); sb.append("\", \"items\": ["); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java index ccd74faed3..478949623e 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java @@ -18,11 +18,13 @@ package org.apache.nifi.processors.elasticsearch; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -360,6 +362,22 @@ public class TestQueryElasticsearchHttp { runner.run(1, true, true); } + @Test + public void testQueryElasticsearchOnTrigger_withQueryParameters() throws IOException { + QueryElasticsearchHttpTestProcessor p = new QueryElasticsearchHttpTestProcessor(); + p.setExpectedParam("myparam=myvalue"); + runner = TestRunners.newTestRunner(p); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter"); + // Set dynamic property, to be added to the URL as a query parameter + runner.setProperty("myparam", "myvalue"); + runAndVerifySuccess(true); + } + /** * A Test class that extends the processor in order to inject/mock behavior */ @@ -376,6 +394,8 @@ public class TestQueryElasticsearchHttp { List pages = Arrays.asList(getDoc("query-page1.json"), getDoc("query-page2.json"), getDoc("query-page3.json")); + String expectedParam = null; + public void setExceptionToThrow(Exception exceptionToThrow) { this.exceptionToThrow = exceptionToThrow; } @@ -392,6 +412,16 @@ public class TestQueryElasticsearchHttp { this.setStatus(code, message, 1); } + /** + * Sets an query parameter (name=value) expected to be at the end of the URL for the query operation + * + * @param param + * The parameter to expect + */ + void setExpectedParam(String param) { + expectedParam = param; + } + /** * Sets the status code and message for the runNumber-th query * @@ -431,6 +461,7 @@ public class TestQueryElasticsearchHttp { @Override public Call answer(InvocationOnMock invocationOnMock) throws Throwable { Request realRequest = (Request) invocationOnMock.getArguments()[0]; + assertTrue((expectedParam == null) || (realRequest.url().toString().endsWith(expectedParam))); Response mockResponse = new Response.Builder() .request(realRequest) .protocol(Protocol.HTTP_1_1) @@ -456,8 +487,7 @@ public class TestQueryElasticsearchHttp { private static String getDoc(String filename) { try { - return IOUtils.toString(QueryElasticsearchHttp.class.getClassLoader() - .getResourceAsStream(filename)); + return IOUtils.toString(QueryElasticsearchHttp.class.getClassLoader().getResourceAsStream(filename), StandardCharsets.UTF_8); } catch (IOException e) { System.out.println("Error reading document " + filename); return ""; diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java index a1a4e8df2e..1e687f12d1 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java @@ -17,11 +17,13 @@ package org.apache.nifi.processors.elasticsearch; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -316,6 +318,24 @@ public class TestScrollElasticsearchHttp { runner.assertTransferCount(ScrollElasticsearchHttp.REL_FAILURE, 1); } + @Test + public void testScrollElasticsearchOnTrigger_withQueryParameter() throws IOException { + ScrollElasticsearchHttpTestProcessor p = new ScrollElasticsearchHttpTestProcessor(); + p.setExpectedParam("myparam=myvalue"); + runner = TestRunners.newTestRunner(p); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); + runner.setProperty(ScrollElasticsearchHttp.TYPE, "status"); + runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:WZ"); + runner.setProperty(ScrollElasticsearchHttp.PAGE_SIZE, "2"); + // Set dynamic property, to be added to the URL as a query parameter + runner.setProperty("myparam", "myvalue"); + runner.setIncomingConnection(false); + runAndVerifySuccess(); + } + /** * A Test class that extends the processor in order to inject/mock behavior */ @@ -332,6 +352,8 @@ public class TestScrollElasticsearchHttp { List pages = Arrays.asList(getDoc("scroll-page1.json"), getDoc("scroll-page2.json"), getDoc("scroll-page3.json")); + String expectedParam = null; + public void setExceptionToThrow(Exception exceptionToThrow) { this.exceptionToThrow = exceptionToThrow; } @@ -364,6 +386,16 @@ public class TestScrollElasticsearchHttp { this.runNumber = runNumber; } + /** + * Sets an query parameter (name=value) expected to be at the end of the URL for the query operation + * + * @param param + * The parameter to expect + */ + void setExpectedParam(String param) { + expectedParam = param; + } + @Override protected void createElasticsearchClient(ProcessContext context) throws ProcessException { client = mock(OkHttpClient.class); @@ -387,6 +419,7 @@ public class TestScrollElasticsearchHttp { @Override public Call answer(InvocationOnMock invocationOnMock) throws Throwable { Request realRequest = (Request) invocationOnMock.getArguments()[0]; + assertTrue((expectedParam == null) || (realRequest.url().toString().endsWith(expectedParam))); Response mockResponse = new Response.Builder() .request(realRequest) .protocol(Protocol.HTTP_1_1) @@ -412,8 +445,7 @@ public class TestScrollElasticsearchHttp { private static String getDoc(String filename) { try { - return IOUtils.toString(ScrollElasticsearchHttp.class.getClassLoader() - .getResourceAsStream(filename)); + return IOUtils.toString(ScrollElasticsearchHttp.class.getClassLoader().getResourceAsStream(filename), StandardCharsets.UTF_8); } catch (IOException e) { System.out.println("Error reading document " + filename); return "";