diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/README.md b/nifi-nar-bundles/nifi-elasticsearch-bundle/README.md
index ba4983ec12..2f4c186a1c 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/README.md
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/README.md
@@ -48,7 +48,8 @@ Execute the following script from the `nifi-elasticsearch-bundle` directory:
```bash
mvn --fail-at-end -Pcontrib-check clean install
-es_versions=(elasticsearch6 elasticsearch7 elasticsearch8)
+# blank entry to run the default integration-tests profile, i.e. Elasticsearch 8
+es_versions=(elasticsearch6 elasticsearch7 " ")
it_modules=(nifi-elasticsearch-client-service nifi-elasticsearch-restapi-processors)
for v in "${es_versions[@]}"; do
for m in "${it_modules[@]}"; do
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
index 77ac25d65f..9fa4fd4521 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
@@ -145,7 +145,7 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
@Test
void testVerifyFailedURL() {
runner.disableControllerService(service);
- runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, "invalid");
+ runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, "blah://invalid");
final List results = service.verify(
new MockConfigurationContext(service, getClientServiceProperties(), runner.getProcessContext().getControllerServiceLookup(), null),
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
index 41ae54fe70..226c643b9e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java
@@ -51,7 +51,7 @@ public abstract class AbstractByQueryElasticsearch extends AbstractProcessor imp
.build();
private static final Set relationships;
- private static final List propertyDescriptors;
+ static final List byQueryPropertyDescriptors;
private final AtomicReference clientService = new AtomicReference<>(null);
@@ -63,13 +63,16 @@ public abstract class AbstractByQueryElasticsearch extends AbstractProcessor imp
relationships = Collections.unmodifiableSet(rels);
final List descriptors = new ArrayList<>();
+ descriptors.add(QUERY_DEFINITION_STYLE);
descriptors.add(QUERY);
+ descriptors.add(QUERY_CLAUSE);
+ descriptors.add(SCRIPT);
descriptors.add(QUERY_ATTRIBUTE);
descriptors.add(INDEX);
descriptors.add(TYPE);
descriptors.add(CLIENT_SERVICE);
- propertyDescriptors = Collections.unmodifiableList(descriptors);
+ byQueryPropertyDescriptors = Collections.unmodifiableList(descriptors);
}
abstract String getTookAttribute();
@@ -86,8 +89,8 @@ public abstract class AbstractByQueryElasticsearch extends AbstractProcessor imp
}
@Override
- public final List getSupportedPropertyDescriptors() {
- return propertyDescriptors;
+ public List getSupportedPropertyDescriptors() {
+ return byQueryPropertyDescriptors;
}
@Override
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
index f12409b074..383c4dfd0a 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.elasticsearch;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
@@ -112,7 +111,7 @@ public abstract class AbstractJsonQueryElasticsearch relationships;
- private static final List propertyDescriptors;
+ static final List queryPropertyDescriptors;
ResultOutputStrategy hitStrategy;
private SearchResultsFormat hitFormat;
@@ -120,8 +119,6 @@ public abstract class AbstractJsonQueryElasticsearch clientService = new AtomicReference<>(null);
static {
@@ -133,7 +130,14 @@ public abstract class AbstractJsonQueryElasticsearch descriptors = new ArrayList<>();
+ descriptors.add(QUERY_DEFINITION_STYLE);
descriptors.add(QUERY);
+ descriptors.add(QUERY_CLAUSE);
+ descriptors.add(SIZE);
+ descriptors.add(SORT);
+ descriptors.add(AGGREGATIONS);
+ descriptors.add(FIELDS);
+ descriptors.add(SCRIPT_FIELDS);
descriptors.add(QUERY_ATTRIBUTE);
descriptors.add(INDEX);
descriptors.add(TYPE);
@@ -144,7 +148,7 @@ public abstract class AbstractJsonQueryElasticsearch getSupportedPropertyDescriptors() {
- return propertyDescriptors;
+ return queryPropertyDescriptors;
}
@Override
@@ -183,8 +187,8 @@ public abstract class AbstractJsonQueryElasticsearch {
public static final PropertyDescriptor SEARCH_RESULTS_SPLIT = new PropertyDescriptor.Builder()
@@ -77,18 +78,12 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
static final List paginatedPropertyDescriptors;
static {
- final List descriptors = new ArrayList<>();
- descriptors.add(QUERY_ATTRIBUTE);
- descriptors.add(INDEX);
- descriptors.add(TYPE);
- descriptors.add(CLIENT_SERVICE);
- descriptors.add(SEARCH_RESULTS_SPLIT);
- descriptors.add(SEARCH_RESULTS_FORMAT);
- descriptors.add(AGGREGATION_RESULTS_SPLIT);
- descriptors.add(AGGREGATION_RESULTS_FORMAT);
+ final List descriptors = new ArrayList<>(
+ // replace SEARCH_RESULTS_SPLIT property to allow additional output strategies
+ queryPropertyDescriptors.stream().map(pd -> AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT.equals(pd) ? SEARCH_RESULTS_SPLIT : pd).collect(Collectors.toList())
+ );
descriptors.add(PAGINATION_TYPE);
descriptors.add(PAGINATION_KEEP_ALIVE);
- descriptors.add(OUTPUT_NO_HITS);
paginatedPropertyDescriptors = Collections.unmodifiableList(descriptors);
}
@@ -118,10 +113,13 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
// execute query/scroll
final String queryJson = updateQueryJson(newQuery, paginatedJsonQueryParameters);
+ final Map requestParameters = getDynamicProperties(context, input);
if (!newQuery && paginationType == PaginationType.SCROLL) {
+ if (!requestParameters.isEmpty()) {
+ getLogger().warn("Elasticsearch _scroll API does not accept query parameters, ignoring dynamic properties {}", requestParameters.keySet());
+ }
response = clientService.get().scroll(queryJson);
} else {
- final Map requestParameters = getDynamicProperties(context, input);
if (paginationType == PaginationType.SCROLL) {
requestParameters.put("scroll", paginatedJsonQueryParameters.getKeepAlive());
}
@@ -145,8 +143,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
);
}
- // mark the paginated query for expiry if there are no hits (no more pages to obtain so stop looping on this query)
- updatePageExpirationTimestamp(paginatedJsonQueryParameters, !response.getHits().isEmpty());
+ updateQueryParameters(paginatedJsonQueryParameters, response);
hitsFlowFiles = handleResponse(response, newQuery, paginatedJsonQueryParameters, hitsFlowFiles, session, input, stopWatch);
} while (!response.getHits().isEmpty() && (input != null || hitStrategy == ResultOutputStrategy.PER_QUERY));
@@ -268,11 +265,9 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
List handleHits(final List> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
final ProcessSession session, final FlowFile parent, final Map attributes,
final List hitsFlowFiles, final String transitUri, final StopWatch stopWatch) throws IOException {
- paginatedJsonQueryParameters.incrementPageCount();
attributes.put("page.number", Integer.toString(paginatedJsonQueryParameters.getPageCount()));
if (hitStrategy == ResultOutputStrategy.PER_QUERY) {
-
final List> formattedHits = formatHits(hits);
combineHits(formattedHits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, newQuery);
@@ -289,8 +284,11 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
return hitsFlowFiles;
}
- private void updatePageExpirationTimestamp(final PaginatedJsonQueryParameters paginatedJsonQueryParameters, final boolean hasHits) {
- final String keepAliveDuration = "PT" + (hasHits ? paginatedJsonQueryParameters.getKeepAlive() : "0s");
+ void updateQueryParameters(final PaginatedJsonQueryParameters paginatedJsonQueryParameters, final SearchResponse response) {
+ paginatedJsonQueryParameters.incrementPageCount();
+
+ // mark the paginated query for expiry if there are no hits (no more pages to obtain so stop looping on this query)
+ final String keepAliveDuration = "PT" + (!response.getHits().isEmpty() ? paginatedJsonQueryParameters.getKeepAlive() : "0s");
paginatedJsonQueryParameters.setPageExpirationTimestamp(
String.valueOf(Instant.now().plus(Duration.parse(keepAliveDuration)).toEpochMilli())
);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
index 0b1e0b0447..1bbe246846 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
@@ -30,6 +31,7 @@ import org.apache.nifi.elasticsearch.IndexOperationRequest;
import org.apache.nifi.elasticsearch.IndexOperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -108,8 +110,6 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme
static final String BULK_HEADER_PREFIX = "BULK:";
- static final ObjectMapper MAPPER = new ObjectMapper();
-
boolean logErrors;
boolean outputErrorResponses;
boolean notFoundIsSuccessful;
@@ -201,6 +201,12 @@ public abstract class AbstractPutElasticsearch extends AbstractProcessor impleme
return validationResults;
}
+ @Override
+ public List verifyAfterIndex(final ProcessContext context, final ComponentLog verificationLogger, final Map attributes,
+ final ElasticSearchClientService verifyClientService, final String index, final boolean indexExists) {
+ return Collections.emptyList();
+ }
+
Map getRequestURLParameters(final Map dynamicProperties) {
return dynamicProperties.entrySet().stream().filter(e -> !e.getKey().startsWith(BULK_HEADER_PREFIX))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
new file mode 100644
index 0000000000..c120471788
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ConsumeElasticsearch.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.elasticsearch.SearchResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@WritesAttributes({
+ @WritesAttribute(attribute = "mime.type", description = "application/json"),
+ @WritesAttribute(attribute = "page.number", description = "The number of the page (request), starting from 1, in which the results were returned that are in the output flowfile"),
+ @WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile"),
+ @WritesAttribute(attribute = "elasticsearch.query.error", description = "The error message provided by Elasticsearch if there is an error querying the index.")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@PrimaryNodeOnly
+@DefaultSchedule(period="1 min")
+@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "elasticsearch8", "query", "scroll", "page", "search", "json"})
+@CapabilityDescription("A processor that repeatedly runs a paginated query against a field using a Range query to consume new Documents from an Elasticsearch index/query. " +
+ "The processor will retrieve multiple pages of results until either no more results are available or the Pagination Keep Alive expiration is reached, " +
+ "after which the Range query will automatically update the field constraint based on the last retrieved Document value.")
+@SeeAlso({SearchElasticsearch.class, PaginatedJsonQueryElasticsearch.class})
+@DynamicProperty(
+ name = "The name of a URL query parameter to add",
+ value = "The value of the URL query parameter",
+ expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT,
+ description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " +
+ "These parameters will override any matching parameters in the query request body. " +
+ "For SCROLL type queries, these parameters are only used in the initial (first page) query as the " +
+ "Elasticsearch Scroll API does not support the same query parameters for subsequent pages of data.")
+@Stateful(scopes = Scope.CLUSTER, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, pageExpirationTimestamp, trackingRangeValue) " +
+ "is retained in between invocations of this processor until the Scroll/PiT has expired " +
+ "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
+ "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+public class ConsumeElasticsearch extends SearchElasticsearch {
+ static final String STATE_RANGE_VALUE = "trackingRangeValue";
+
+ public static final PropertyDescriptor RANGE_FIELD = new PropertyDescriptor.Builder()
+ .name("es-rest-range-field")
+ .displayName("Range Query Field")
+ .description("Field to be tracked as part of an Elasticsearch Range query using a \"gt\" bound match. " +
+ "This field must exist within the Elasticsearch document for it to be retrieved.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor RANGE_FIELD_SORT_ORDER = new PropertyDescriptor.Builder()
+ .name("es-rest-sort-order")
+ .displayName("Sort Order")
+ .description("The order in which to sort the \"" + RANGE_FIELD.getDisplayName() + "\". " +
+ "A \"sort\" clause for the \"" + RANGE_FIELD.getDisplayName() +
+ "\" field will be prepended to any provided \"" + SORT.getDisplayName() + "\" clauses. " +
+ "If a \"sort\" clause already exists for the \"" + RANGE_FIELD.getDisplayName() +
+ "\" field, it will not be updated.")
+ .allowableValues("asc", "desc")
+ .defaultValue("asc")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor RANGE_INITIAL_VALUE = new PropertyDescriptor.Builder()
+ .name("es-rest-range-initial-value")
+ .displayName("Initial Value")
+ .description("The initial value to use for the query if the processor has not run previously. " +
+ "If the processor has run previously and stored a value in its state, this property will be ignored. " +
+ "If no value is provided, and the processor has not previously run, no Range query bounds will be used, " +
+ "i.e. all documents will be retrieved in the specified \"" + RANGE_FIELD_SORT_ORDER.getDisplayName() + "\".")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor RANGE_DATE_FORMAT = new PropertyDescriptor.Builder()
+ .name("es-rest-range-format")
+ .displayName(RANGE_INITIAL_VALUE.getDisplayName() + " Date Format")
+ .description("If the \"" + RANGE_FIELD.getDisplayName() + "\" is a Date field, convert the \"" + RANGE_INITIAL_VALUE.getDisplayName() + "\" to a date with this format. " +
+ "If not specified, Elasticsearch will use the date format provided by the \"" + RANGE_FIELD.getDisplayName() + "\"'s mapping. " +
+ "For valid syntax, see https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(RANGE_INITIAL_VALUE)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor RANGE_TIME_ZONE = new PropertyDescriptor.Builder()
+ .name("es-rest-range-time-zone")
+ .displayName(RANGE_INITIAL_VALUE.getDisplayName() + " Date Time Zone")
+ .description("If the \"" + RANGE_FIELD.getDisplayName() + "\" is a Date field, convert the \"" + RANGE_INITIAL_VALUE.getDisplayName() + "\" to UTC with this time zone. " +
+ "Valid values are ISO 8601 UTC offsets, such as \"+01:00\" or \"-08:00\", and IANA time zone IDs, such as \"Europe/London\".")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(RANGE_INITIAL_VALUE)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor ADDITIONAL_FILTERS = new PropertyDescriptor.Builder()
+ .name("es-rest-additional-filters")
+ .displayName("Additional Filters")
+ .description("One or more query filters in JSON syntax, not Lucene syntax. " +
+ "Ex: [{\"match\":{\"somefield\":\"somevalue\"}}, {\"match\":{\"anotherfield\":\"anothervalue\"}}]. " +
+ "These filters wil be used as part of a Bool query's filter.")
+ .addValidator(JsonValidator.INSTANCE)
+ .required(false)
+ .build();
+
+ private static final List propertyDescriptors;
+
+ static {
+ final List descriptors = new ArrayList<>();
+ descriptors.add(RANGE_FIELD);
+ descriptors.add(RANGE_FIELD_SORT_ORDER);
+ descriptors.add(RANGE_INITIAL_VALUE);
+ descriptors.add(RANGE_DATE_FORMAT);
+ descriptors.add(RANGE_TIME_ZONE);
+ descriptors.add(ADDITIONAL_FILTERS);
+ descriptors.addAll(scrollPropertyDescriptors.stream()
+ .filter(pd -> !QUERY.equals(pd) && !QUERY_CLAUSE.equals(pd) && !QUERY_DEFINITION_STYLE.equals(pd))
+ .collect(Collectors.toList()));
+
+ propertyDescriptors = Collections.unmodifiableList(descriptors);
+ }
+
+ protected String trackingRangeField;
+ protected String trackingSortOrder;
+
+ @Override
+ public final List getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ Scope getStateScope() {
+ return Scope.CLUSTER;
+ }
+
+ @Override
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ super.onScheduled(context);
+
+ // set tracking field information, so it can be used to update the StateMap after query execution (where ProcessContext is not available)
+ trackingRangeField = context.getProperty(RANGE_FIELD).getValue();
+ trackingSortOrder = context.getProperty(RANGE_FIELD_SORT_ORDER).getValue();
+ }
+
+ @Override
+ @OnStopped
+ public void onStopped() {
+ super.onStopped();
+
+ // reset tracking fields, so that we don't retain incorrect values between processor restarts
+ trackingRangeField = null;
+ trackingSortOrder = null;
+ }
+
+ private String getTrackingRangeField(final ProcessContext context) {
+ final String field;
+ if (trackingRangeField != null) {
+ field = trackingRangeField;
+ } else if (context != null) {
+ field = context.getProperty(RANGE_FIELD).getValue();
+ } else {
+ field = null;
+ }
+ return field;
+ }
+
+ private String getTrackingSortOrder(final ProcessContext context) {
+ final String sortOrder;
+ if (trackingSortOrder != null) {
+ sortOrder = trackingSortOrder;
+ } else if (context != null) {
+ sortOrder = context.getProperty(RANGE_FIELD_SORT_ORDER).getValue();
+ } else {
+ sortOrder = null;
+ }
+ return sortOrder;
+ }
+
+ @Override
+ PaginatedJsonQueryParameters buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException {
+ final PaginatedJsonQueryParameters paginatedQueryJsonParameters = super.buildJsonQueryParameters(input, context, session);
+ paginatedQueryJsonParameters.setTrackingRangeValue(getTrackingRangeValueOrDefault(context));
+ return paginatedQueryJsonParameters;
+ }
+
+ @Override
+ public void addQueryClause(final Map query, final Map attributes, final ProcessContext context) throws IOException {
+ final List> filters = new ArrayList<>(10);
+
+ // only retrieve documents with values greater than the last queried value (if present)
+ final String trackingRangeValue = getTrackingRangeValueOrDefault(context);
+ if (StringUtils.isNotBlank(trackingRangeValue)) {
+ filters.add(Collections.singletonMap("range", Collections.singletonMap(getTrackingRangeField(context),
+ new HashMap(3, 1) {{
+ put("gt", trackingRangeValue);
+ if (context.getProperty(RANGE_DATE_FORMAT).isSet()) {
+ put("format", context.getProperty(RANGE_DATE_FORMAT).getValue());
+ }
+ if (context.getProperty(RANGE_TIME_ZONE).isSet()) {
+ put("time_zone", context.getProperty(RANGE_TIME_ZONE).getValue());
+ }
+ }})));
+ }
+
+ // add any additional filters specified as a property, allowing for one (Object) or multiple (Array of Objects) filters
+ if (context.getProperty(ADDITIONAL_FILTERS).isSet()) {
+ final JsonNode additionalFilters = mapper.readTree(context.getProperty(ADDITIONAL_FILTERS).getValue());
+ if (additionalFilters.isArray()) {
+ filters.addAll(mapper.convertValue(additionalFilters, new TypeReference>>() {}));
+ } else {
+ filters.add(mapper.convertValue(additionalFilters, new TypeReference>() {}));
+ }
+ }
+
+ if (!filters.isEmpty()) {
+ final Map bool = Collections.singletonMap("bool", Collections.singletonMap("filter", filters));
+ query.put("query", bool);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void addSortClause(final Map query, final Map attributes, final ProcessContext context) throws IOException {
+ super.addSortClause(query, attributes, context);
+
+ final List> sort;
+ if (query.containsKey("sort")) {
+ sort = (List>) query.get("sort");
+ } else {
+ sort = new ArrayList<>(1);
+ query.put("sort", sort);
+ }
+
+ if (sort.stream().noneMatch(s -> s.containsKey(getTrackingRangeField(context)))) {
+ sort.add(0, Collections.singletonMap(getTrackingRangeField(context), getTrackingSortOrder(context)));
+ }
+ }
+
+ @Override
+ void additionalState(final Map newStateMap, final PaginatedJsonQueryParameters paginatedJsonQueryParameters) {
+ newStateMap.put(STATE_RANGE_VALUE, paginatedJsonQueryParameters.getTrackingRangeValue());
+ }
+
+ @Override
+ void updateQueryParameters(final PaginatedJsonQueryParameters paginatedJsonQueryParameters, final SearchResponse response) {
+ super.updateQueryParameters(paginatedJsonQueryParameters, response);
+
+ // update the tracking range value with first/last hit (depending upon sort order)
+ if (!response.getHits().isEmpty()) {
+ final int trackingHitIndex;
+ if ("desc".equals(getTrackingSortOrder(null)) && paginatedJsonQueryParameters.getPageCount() == 1) {
+ trackingHitIndex = 0;
+ } else if ("asc".equals(getTrackingSortOrder(null))) {
+ trackingHitIndex = response.getHits().size() - 1;
+ } else {
+ return;
+ }
+
+ @SuppressWarnings("unchecked")
+ final String nextValue = String.valueOf(((Map) response.getHits().get(trackingHitIndex).get("_source"))
+ .get(getTrackingRangeField(null)));
+ if (StringUtils.isNotBlank(nextValue)) {
+ paginatedJsonQueryParameters.setTrackingRangeValue(nextValue);
+ }
+ }
+ }
+
+ private String getTrackingRangeValueOrDefault(final ProcessContext context) throws IOException {
+ final StateMap stateMap = context.getStateManager().getState(getStateScope());
+ return stateMap == null || stateMap.get(STATE_RANGE_VALUE) == null
+ ? context.getProperty(RANGE_INITIAL_VALUE).getValue()
+ : stateMap.get(STATE_RANGE_VALUE);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
index 414750c14b..6627f20b1d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java
@@ -23,11 +23,16 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.OperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
@WritesAttributes({
@WritesAttribute(attribute = "elasticsearch.delete.took", description = "The amount of time that it took to complete the delete operation in ms."),
@@ -47,6 +52,21 @@ public class DeleteByQueryElasticsearch extends AbstractByQueryElasticsearch {
static final String TOOK_ATTRIBUTE = "elasticsearch.delete.took";
static final String ERROR_ATTRIBUTE = "elasticsearch.delete.error";
+ private static final List propertyDescriptors;
+
+ static {
+ final List descriptors = new ArrayList<>(
+ byQueryPropertyDescriptors.stream().filter(pd -> !SCRIPT.equals(pd)).collect(Collectors.toList())
+ );
+
+ propertyDescriptors = Collections.unmodifiableList(descriptors);
+ }
+
+ @Override
+ public final List getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
@Override
String getTookAttribute() {
return TOOK_ATTRIBUTE;
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
index bce344869c..01b3ec1a2b 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java
@@ -17,32 +17,43 @@
package org.apache.nifi.processors.elasticsearch;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticsearchException;
+import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.util.JsonValidator;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.elasticsearch.api.QueryDefinitionType;
import org.apache.nifi.util.StringUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-public interface ElasticsearchRestProcessor extends VerifiableProcessor {
+public interface ElasticsearchRestProcessor extends Processor, VerifiableProcessor {
String ATTR_RECORD_COUNT = "record.count";
String VERIFICATION_STEP_INDEX_EXISTS = "Elasticsearch Index Exists";
+ String VERIFICATION_STEP_QUERY_JSON_VALID = "Elasticsearch Query JSON Valid";
+ String VERIFICATION_STEP_QUERY_VALID = "Elasticsearch Query Valid";
PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
.name("el-rest-fetch-index")
@@ -62,11 +73,101 @@ public interface ElasticsearchRestProcessor extends VerifiableProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ PropertyDescriptor QUERY_DEFINITION_STYLE = new PropertyDescriptor.Builder()
+ .name("el-rest-query-definition-style")
+ .displayName("Query Definition Style")
+ .description("How the JSON Query will be defined for use by the processor.")
+ .required(true)
+ .allowableValues(QueryDefinitionType.class)
+ .defaultValue(QueryDefinitionType.FULL_QUERY.getValue())
+ .build();
+
PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("el-rest-query")
.displayName("Query")
.description("A query in JSON syntax, not Lucene syntax. Ex: {\"query\":{\"match\":{\"somefield\":\"somevalue\"}}}. " +
- "If this parameter is not set, the query will be read from the flowfile content.")
+ "If this parameter is not set, the query will be read from the flowfile content. " +
+ "If the query (property and flowfile content) is empty, a default empty JSON Object will be used, " +
+ "which will result in a \"match_all\" query in Elasticsearch.")
+ .dependsOn(QUERY_DEFINITION_STYLE, QueryDefinitionType.FULL_QUERY)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(JsonValidator.INSTANCE)
+ .build();
+
+ PropertyDescriptor QUERY_CLAUSE = new PropertyDescriptor.Builder()
+ .name("el-rest-query-clause")
+ .displayName("Query Clause")
+ .description("A \"query\" clause in JSON syntax, not Lucene syntax. Ex: {\"match\":{\"somefield\":\"somevalue\"}}. " +
+ "If the query is empty, a default JSON Object will be used, which will result in a \"match_all\" query in Elasticsearch.")
+ .dependsOn(QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(JsonValidator.INSTANCE)
+ .build();
+
+ PropertyDescriptor SCRIPT = new PropertyDescriptor.Builder()
+ .name("el-rest-script")
+ .displayName("Script")
+ .description("A \"script\" to execute during the operation, in JSON syntax. " +
+ "Ex: {\"source\": \"ctx._source.count++\", \"lang\": \"painless\"}")
+ .dependsOn(QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(JsonValidator.INSTANCE)
+ .build();
+
+ PropertyDescriptor SIZE = new PropertyDescriptor.Builder()
+ .name("es-rest-size")
+ .displayName("Size")
+ .description("The maximum number of documents to retrieve in the query. If the query is paginated, " +
+ "this \"size\" applies to each page of the query, not the \"size\" of the entire result set.")
+ .dependsOn(QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ PropertyDescriptor AGGREGATIONS = new PropertyDescriptor.Builder()
+ .name("es-rest-query-aggs")
+ .displayName("Aggregations")
+ .description("One or more query aggregations (or \"aggs\"), in JSON syntax. " +
+ "Ex: {\"items\": {\"terms\": {\"field\": \"product\", \"size\": 10}}}")
+ .dependsOn(QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(JsonValidator.INSTANCE)
+ .build();
+
+ PropertyDescriptor SORT = new PropertyDescriptor.Builder()
+ .name("es-rest-query-sort")
+ .displayName("Sort")
+ .description("Sort results by one or more fields, in JSON syntax. " +
+ "Ex: [{\"price\" : {\"order\" : \"asc\", \"mode\" : \"avg\"}}, {\"post_date\" : {\"format\": \"strict_date_optional_time_nanos\"}}]")
+ .dependsOn(QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(JsonValidator.INSTANCE)
+ .build();
+
+ PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
+ .name("es-rest-query-fields")
+ .displayName("Fields")
+ .description("Fields of indexed documents to be retrieved, in JSON syntax. " +
+ "Ex: [\"user.id\", \"http.response.*\", {\"field\": \"@timestamp\", \"format\": \"epoch_millis\"}]")
+ .dependsOn(QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY)
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(JsonValidator.INSTANCE)
+ .build();
+
+ PropertyDescriptor SCRIPT_FIELDS = new PropertyDescriptor.Builder()
+ .name("es-rest-query-script-fields")
+ .displayName("Script Fields")
+ .description("Fields to created using script evaluation at query runtime, in JSON syntax. " +
+ "Ex: {\"test1\": {\"script\": {\"lang\": \"painless\", \"source\": \"doc['price'].value * 2\"}}, " +
+ "\"test2\": {\"script\": {\"lang\": \"painless\", \"source\": \"doc['price'].value * params.factor\", \"params\": {\"factor\": 2.0}}}}")
+ .dependsOn(QUERY_DEFINITION_STYLE, QueryDefinitionType.BUILD_QUERY)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(JsonValidator.INSTANCE)
@@ -109,19 +210,92 @@ public interface ElasticsearchRestProcessor extends VerifiableProcessor {
.description("All flowfiles that fail due to server/cluster availability go to this relationship.")
.build();
- default String getQuery(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException {
- String retVal = null;
- if (context.getProperty(QUERY).isSet()) {
- retVal = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
- } else if (input != null) {
- final ByteArrayOutputStream out = new ByteArrayOutputStream();
- session.exportTo(input, out);
- out.close();
+ String DEFAULT_QUERY_JSON = "{}";
- retVal = out.toString();
+ ObjectMapper mapper = new ObjectMapper();
+
+ default String getQuery(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException {
+ String retVal = getQuery(input != null ? input.getAttributes() : Collections.emptyMap(), context);
+ if (DEFAULT_QUERY_JSON.equals(retVal) && input != null
+ && QueryDefinitionType.FULL_QUERY.getValue().equals(context.getProperty(QUERY_DEFINITION_STYLE).getValue())
+ && !context.getProperty(QUERY).isSet()) {
+ try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ session.exportTo(input, out);
+ retVal = out.toString();
+ }
}
- return retVal;
+ return StringUtils.isNotBlank(retVal) ? retVal : DEFAULT_QUERY_JSON;
+ }
+
+ default String getQuery(final Map attributes, final ProcessContext context) throws IOException {
+ final String retVal;
+ if (QueryDefinitionType.FULL_QUERY.getValue().equals(context.getProperty(QUERY_DEFINITION_STYLE).getValue())) {
+ if (context.getProperty(QUERY).isSet()) {
+ retVal = context.getProperty(QUERY).evaluateAttributeExpressions(attributes).getValue();
+ } else {
+ retVal = null;
+ }
+ } else {
+ final Map query = new HashMap<>(7, 1);
+ addQueryClause(query, attributes, context);
+ if (context.getProperty(SIZE).isSet()) {
+ query.put("size", context.getProperty(SIZE).evaluateAttributeExpressions(attributes).asInteger());
+ }
+ addSortClause(query, attributes, context);
+ if (context.getProperty(AGGREGATIONS).isSet()) {
+ query.put("aggs", mapper.readTree(context.getProperty(AGGREGATIONS).evaluateAttributeExpressions(attributes).getValue()));
+ }
+ if (context.getProperty(SCRIPT).isSet()) {
+ query.put("script", mapper.readTree(context.getProperty(SCRIPT).evaluateAttributeExpressions(attributes).getValue()));
+ }
+ if (context.getProperty(FIELDS).isSet()) {
+ query.put("fields", mapper.readTree(context.getProperty(FIELDS).evaluateAttributeExpressions(attributes).getValue()));
+ }
+ if (context.getProperty(SCRIPT_FIELDS).isSet()) {
+ query.put("script_fields", mapper.readTree(context.getProperty(SCRIPT_FIELDS).evaluateAttributeExpressions(attributes).getValue()));
+ }
+ retVal = mapper.writeValueAsString(query);
+ }
+
+ // allow for no query to be specified, which will run a "match_all" query in Elasticsearch by default
+ return StringUtils.isNotBlank(retVal) ? retVal : DEFAULT_QUERY_JSON;
+ }
+
+ /**
+ * Add "query" clause to the Elasticsearch query object.
+ * Overridable method for processors that build a query clause from separate components, e.g. ConsumeElasticsearch
+ *
+ * @param query the Query object being constructed
+ * @param attributes (optional) input FlowFile attributes
+ * @param context ProcessContext of the running processor
+ */
+ default void addQueryClause(final Map query, final Map attributes, final ProcessContext context) throws IOException {
+ if (context.getProperty(QUERY_CLAUSE).isSet()) {
+ query.put("query", mapper.readTree(context.getProperty(QUERY_CLAUSE).evaluateAttributeExpressions(attributes).getValue()));
+ }
+ }
+
+ /**
+ * Add "sort" clause to the Elasticsearch query object.
+ * Overridable method for processors that build a sort clause from separate components, e.g. ConsumeElasticsearch
+ *
+ * @param query the Query object being constructed
+ * @param attributes (optional) input FlowFile attributes
+ * @param context ProcessContext of the running processor
+ */
+ default void addSortClause(final Map query, final Map attributes, final ProcessContext context) throws IOException {
+ if (context.getProperty(SORT).isSet()) {
+ // ensure sort is specified as a List for easier manipulation if needed later
+ final List> sortList;
+ final JsonNode sort = mapper.readTree(context.getProperty(SORT).evaluateAttributeExpressions(attributes).getValue());
+ if (sort.isArray()) {
+ sortList = mapper.convertValue(sort, new TypeReference>>() {});
+ } else {
+ sortList = Collections.singletonList(mapper.convertValue(sort, new TypeReference>() {}));
+ }
+ query.put("sort", new ArrayList<>(sortList));
+ }
}
default Map getDynamicProperties(final ProcessContext context, final FlowFile flowFile) {
@@ -189,9 +363,51 @@ public interface ElasticsearchRestProcessor extends VerifiableProcessor {
boolean isIndexNotExistSuccessful();
- @SuppressWarnings("unused")
default List verifyAfterIndex(final ProcessContext context, final ComponentLog verificationLogger, final Map attributes,
final ElasticSearchClientService verifyClientService, final String index, final boolean indexExists) {
- return Collections.emptyList();
+ final List results = new ArrayList<>();
+ final ConfigVerificationResult.Builder queryJsonValidResult = new ConfigVerificationResult.Builder()
+ .verificationStepName(VERIFICATION_STEP_QUERY_JSON_VALID);
+ final ConfigVerificationResult.Builder queryValidResult = new ConfigVerificationResult.Builder()
+ .verificationStepName(VERIFICATION_STEP_QUERY_VALID);
+
+ if (indexExists) {
+ try {
+ final String query = getQuery(attributes, context);
+ verificationLogger.debug("Query JSON: {}", query);
+ final ObjectNode queryJson = mapper.readValue(query, ObjectNode.class);
+ queryJsonValidResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Query JSON successfully parsed");
+
+ if (queryJson.has("script")) {
+ verificationLogger.debug("Removing \"script\" field from verification Query, not valid for _search");
+ queryJson.remove("script");
+ }
+ final String type = context.getProperty(TYPE).evaluateAttributeExpressions(attributes).getValue();
+ final Map requestParameters = new HashMap<>(getDynamicProperties(context, attributes));
+ requestParameters.putIfAbsent("_source", "false");
+
+ final SearchResponse response = verifyClientService.search(mapper.writeValueAsString(queryJson), index, type, requestParameters);
+ queryValidResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+ .explanation(String.format("Query found %d hits and %d aggregations in %d milliseconds, timed out: %s",
+ response.getNumberOfHits(), response.getAggregations() == null ? 0 : response.getAggregations().size(), response.getTook(), response.isTimedOut()));
+ } catch (final IOException ioe) {
+ verificationLogger.warn("Unable to parse Query as JSON", ioe);
+ queryJsonValidResult.outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("Query cannot be parsed as valid JSON: %s", ioe.getMessage()));
+ queryValidResult.outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation("Query JSON could not be parsed");
+ } catch (final ElasticsearchException ee) {
+ verificationLogger.warn("Query failed in Elasticsearch", ee);
+ queryValidResult.outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("Query failed in Elasticsearch: %s", ee.getMessage()));
+ }
+ } else {
+ final String skippedReason = String.format("Index %s does not exist", index);
+ queryJsonValidResult.outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation(skippedReason);
+ queryValidResult.outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation(skippedReason);
+ }
+ results.add(queryJsonValidResult.build());
+ results.add(queryValidResult.build());
+
+ return results;
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
index 13f578fa9b..d61ccc6f6d 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java
@@ -17,12 +17,12 @@
package org.apache.nifi.processors.elasticsearch;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
@@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicReference;
@WritesAttribute(attribute = "elasticsearch.type", description = "The Elasticsearch document type"),
@WritesAttribute(attribute = "elasticsearch.get.error", description = "The error message provided by Elasticsearch if there is an error fetching the document.")
})
+@SeeAlso(JsonQueryElasticsearch.class)
@DynamicProperty(
name = "The name of a URL query parameter to add",
value = "The value of the URL query parameter",
@@ -129,8 +130,6 @@ public class GetElasticsearch extends AbstractProcessor implements Elasticsearch
REL_DOC, REL_FAILURE, REL_RETRY, REL_NOT_FOUND
)));
- private final ObjectMapper mapper = new ObjectMapper();
-
private final AtomicReference clientService = new AtomicReference<>(null);
@Override
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
index 58e3ad0524..3a82862446 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java
@@ -21,6 +21,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -47,6 +48,7 @@ import java.util.concurrent.TimeUnit;
"Elasticsearch JSON DSL. It does not automatically paginate queries for the user. If an incoming relationship is added to this " +
"processor, it will use the flowfile's content for the query. Care should be taken on the size of the query because the entire response " +
"from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
+@SeeAlso(PaginatedJsonQueryElasticsearch.class)
@DynamicProperty(
name = "The name of a URL query parameter to add",
value = "The value of the URL query parameter",
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
index 553d338dbe..2f529a35f0 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearch.java
@@ -23,6 +23,7 @@ import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.SearchResponse;
@@ -32,15 +33,13 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "application/json"),
@WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
@WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
- @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+ @WritesAttribute(attribute = "page.number", description = "The number of the page (request), starting from 1, in which the results were returned that are in the output flowfile"),
@WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile"),
@WritesAttribute(attribute = "elasticsearch.query.error", description = "The error message provided by Elasticsearch if there is an error querying the index.")
})
@@ -49,6 +48,7 @@ import java.util.List;
@CapabilityDescription("A processor that allows the user to run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
"It will use the flowfile's content for the query unless the QUERY attribute is populated. " +
"Search After/Point in Time queries must include a valid \"sort\" field.")
+@SeeAlso({JsonQueryElasticsearch.class, ConsumeElasticsearch.class, SearchElasticsearch.class})
@DynamicProperty(
name = "The name of a URL query parameter to add",
value = "The value of the URL query parameter",
@@ -60,19 +60,9 @@ import java.util.List;
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
"from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
public class PaginatedJsonQueryElasticsearch extends AbstractPaginatedJsonQueryElasticsearch {
- private static final List propertyDescriptors;
-
- static {
- final List descriptors = new ArrayList<>();
- descriptors.add(QUERY);
- descriptors.addAll(paginatedPropertyDescriptors);
-
- propertyDescriptors = Collections.unmodifiableList(descriptors);
- }
-
@Override
public final List getSupportedPropertyDescriptors() {
- return propertyDescriptors;
+ return paginatedPropertyDescriptors;
}
@Override
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
index 36ea292697..1038b83e95 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
@@ -18,7 +18,6 @@
package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
@@ -28,6 +27,7 @@ import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
@@ -64,6 +64,7 @@ import java.util.stream.Collectors;
description = "The error message if there is an issue parsing the FlowFile, sending the parsed document to Elasticsearch or parsing the Elasticsearch response"),
@WritesAttribute(attribute = "elasticsearch.bulk.error", description = "The _bulk response if there was an error during processing the document within Elasticsearch.")
})
+@SeeAlso(PutElasticsearchRecord.class)
@DynamicProperties({
@DynamicProperty(
name = "The name of the Bulk request header",
@@ -179,7 +180,6 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
)));
private boolean outputErrors;
- private final ObjectMapper objectMapper = new ObjectMapper();
@Override
Set getBaseRelationships() {
@@ -261,7 +261,7 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
try (final InputStream inStream = session.read(input)) {
final byte[] result = IOUtils.toByteArray(inStream);
@SuppressWarnings("unchecked")
- final Map contentMap = objectMapper.readValue(new String(result, charset), Map.class);
+ final Map contentMap = mapper.readValue(new String(result, charset), Map.class);
final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(indexOp);
operations.add(new IndexOperationRequest(index, type, id, contentMap, o, scriptMap, scriptedUpsert, dynamicTemplatesMap, bulkHeaderFields));
@@ -284,7 +284,7 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
private Map getMapFromAttribute(final PropertyDescriptor propertyDescriptor, final ProcessContext context, final FlowFile input) {
final String dynamicTemplates = context.getProperty(propertyDescriptor).evaluateAttributeExpressions(input).getValue();
try {
- return StringUtils.isNotBlank(dynamicTemplates) ? MAPPER.readValue(dynamicTemplates, Map.class) : Collections.emptyMap();
+ return StringUtils.isNotBlank(dynamicTemplates) ? mapper.readValue(dynamicTemplates, Map.class) : Collections.emptyMap();
} catch (final JsonProcessingException jpe) {
throw new ProcessException(propertyDescriptor.getDisplayName() + " must be a String parsable into a JSON Object", jpe);
}
@@ -300,7 +300,7 @@ public class PutElasticsearchJson extends AbstractPutElasticsearch {
errors.forEach((index, error) -> {
String errorMessage;
try {
- errorMessage = objectMapper.writeValueAsString(error);
+ errorMessage = mapper.writeValueAsString(error);
} catch (JsonProcessingException e) {
errorMessage = String.format(
"{\"error\": {\"type\": \"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index c46197bde0..952193842c 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -26,6 +26,7 @@ import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
@@ -93,6 +94,7 @@ import java.util.stream.Collectors;
@WritesAttribute(attribute = "elasticsearch.put.success.count", description = "The number of records that were successfully processed by the Elasticsearch _bulk API."),
@WritesAttribute(attribute = "elasticsearch.bulk.error", description = "The _bulk response if there was an error during processing the record within Elasticsearch.")
})
+@SeeAlso(PutElasticsearchJson.class)
@DynamicProperties({
@DynamicProperty(
name = "The name of the Bulk request header",
@@ -460,7 +462,7 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
stopWatch.getDuration(TimeUnit.MILLISECONDS)
);
- input = session.putAllAttributes(input, new HashMap<>() {{
+ input = session.putAllAttributes(input, new HashMap() {{
put("elasticsearch.put.error.count", String.valueOf(erroredRecords.get()));
put("elasticsearch.put.success.count", String.valueOf(successfulRecords.get()));
}});
@@ -666,7 +668,7 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
map = DataTypeUtils.toMap(fieldValue.getValue(), path.getPath());
} else {
try {
- map = MAPPER.readValue(fieldValue.getValue().toString(), Map.class);
+ map = mapper.readValue(fieldValue.getValue().toString(), Map.class);
} catch (final JsonProcessingException jpe) {
getLogger().error("Unable to parse field {} as Map", path.getPath(), jpe);
throw new ProcessException(
@@ -825,7 +827,7 @@ public class PutElasticsearchRecord extends AbstractPutElasticsearch {
this.writer.write(record);
if (errorType != null && exampleError == null && error != null) {
try {
- exampleError = MAPPER.writeValueAsString(error);
+ exampleError = mapper.writeValueAsString(error);
} catch (JsonProcessingException e) {
exampleError = String.format(
"{\"error\": {\"type\": \"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
index 85ea331273..46d5f4ed76 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java
@@ -27,6 +27,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
@@ -50,12 +51,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "application/json"),
@WritesAttribute(attribute = "aggregation.name", description = "The name of the aggregation whose results are in the output flowfile"),
@WritesAttribute(attribute = "aggregation.number", description = "The number of the aggregation whose results are in the output flowfile"),
- @WritesAttribute(attribute = "page.number", description = "The number of the page (request) in which the results were returned that are in the output flowfile"),
+ @WritesAttribute(attribute = "page.number", description = "The number of the page (request), starting from 1, in which the results were returned that are in the output flowfile"),
@WritesAttribute(attribute = "hit.count", description = "The number of hits that are in the output flowfile"),
@WritesAttribute(attribute = "elasticsearch.query.error", description = "The error message provided by Elasticsearch if there is an error querying the index.")
})
@@ -68,10 +70,11 @@ import java.util.Set;
"Search After/Point in Time queries must include a valid \"sort\" field. The processor will retrieve multiple pages of results " +
"until either no more results are available or the Pagination Keep Alive expiration is reached, after which the query will " +
"restart with the first page of results being retrieved.")
+@SeeAlso({PaginatedJsonQueryElasticsearch.class, ConsumeElasticsearch.class})
@DynamicProperty(
name = "The name of a URL query parameter to add",
value = "The value of the URL query parameter",
- expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
+ expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT,
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " +
"These parameters will override any matching parameters in the query request body. " +
"For SCROLL type queries, these parameters are only used in the initial (first page) query as the " +
@@ -90,14 +93,13 @@ public class SearchElasticsearch extends AbstractPaginatedJsonQueryElasticsearch
static final String STATE_HIT_COUNT = "hitCount";
static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder().fromPropertyDescriptor(ElasticsearchRestProcessor.QUERY)
- .name("el-rest-query")
- .description("A query in JSON syntax, not Lucene syntax. Ex: {\"query\":{\"match\":{\"somefield\":\"somevalue\"}}}.")
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .required(true)
+ .description("A query in JSON syntax, not Lucene syntax. Ex: {\"query\":{\"match\":{\"somefield\":\"somevalue\"}}}. " +
+ "If the query is empty, a default JSON Object will be used, which will result in a \"match_all\" query in Elasticsearch.")
.build();
private static final Set relationships;
- private static final List propertyDescriptors;
+
+ static final List scrollPropertyDescriptors;
static {
final Set rels = new HashSet<>();
@@ -106,10 +108,14 @@ public class SearchElasticsearch extends AbstractPaginatedJsonQueryElasticsearch
relationships = Collections.unmodifiableSet(rels);
final List descriptors = new ArrayList<>();
- descriptors.add(SearchElasticsearch.QUERY);
- descriptors.addAll(paginatedPropertyDescriptors);
-
- propertyDescriptors = Collections.unmodifiableList(descriptors);
+ // ensure QUERY_DEFINITION_STYLE first for consistency between Elasticsearch processors
+ descriptors.add(QUERY_DEFINITION_STYLE);
+ descriptors.add(QUERY);
+ descriptors.addAll(paginatedPropertyDescriptors.stream().filter(
+ // override QUERY to change description (no FlowFile content used by SearchElasticsearch)
+ pd -> !ElasticsearchRestProcessor.QUERY.equals(pd) && !QUERY_DEFINITION_STYLE.equals(pd)
+ ).collect(Collectors.toList()));
+ scrollPropertyDescriptors = descriptors;
}
@Override
@@ -118,15 +124,19 @@ public class SearchElasticsearch extends AbstractPaginatedJsonQueryElasticsearch
}
@Override
- public final List getSupportedPropertyDescriptors() {
- return propertyDescriptors;
+ public List getSupportedPropertyDescriptors() {
+ return scrollPropertyDescriptors;
+ }
+
+ Scope getStateScope() {
+ return Scope.LOCAL;
}
@Override
PaginatedJsonQueryParameters buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException {
final PaginatedJsonQueryParameters paginatedQueryJsonParameters = super.buildJsonQueryParameters(input, context, session);
- final StateMap stateMap = context.getStateManager().getState(Scope.LOCAL);
+ final StateMap stateMap = context.getStateManager().getState(getStateScope());
paginatedQueryJsonParameters.setHitCount(stateMap.get(STATE_HIT_COUNT) == null ? 0 : Integer.parseInt(stateMap.get(STATE_HIT_COUNT)));
paginatedQueryJsonParameters.setPageCount(stateMap.get(STATE_PAGE_COUNT) == null ? 0 : Integer.parseInt(stateMap.get(STATE_PAGE_COUNT)));
paginatedQueryJsonParameters.setScrollId(stateMap.get(STATE_SCROLL_ID));
@@ -138,15 +148,16 @@ public class SearchElasticsearch extends AbstractPaginatedJsonQueryElasticsearch
}
@Override
- void finishQuery(final FlowFile input, final PaginatedJsonQueryParameters paginatedQueryJsonParameters,
+ void finishQuery(final FlowFile input, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
final ProcessSession session, final ProcessContext context, final SearchResponse response) throws IOException {
- if (response.getHits().isEmpty()) {
- getLogger().debug("No more results for paginated query, resetting local state for future queries");
- resetProcessorState(context);
- } else {
- getLogger().debug("Updating local state for next execution");
+ final Map newStateMap = new HashMap<>(10, 1);
+ additionalState(newStateMap, paginatedJsonQueryParameters);
+
+ if (response.getHits().isEmpty()) {
+ getLogger().debug("No more results for paginated query, resetting state for future queries");
+ } else {
+ getLogger().debug("Updating state for next execution");
- final Map newStateMap = new HashMap<>();
if (paginationType == PaginationType.SCROLL) {
newStateMap.put(STATE_SCROLL_ID, response.getScrollId());
} else {
@@ -156,11 +167,15 @@ public class SearchElasticsearch extends AbstractPaginatedJsonQueryElasticsearch
newStateMap.put(STATE_PIT_ID, response.getPitId());
}
}
- newStateMap.put(STATE_HIT_COUNT, Integer.toString(paginatedQueryJsonParameters.getHitCount()));
- newStateMap.put(STATE_PAGE_COUNT, Integer.toString(paginatedQueryJsonParameters.getPageCount()));
- newStateMap.put(STATE_PAGE_EXPIRATION_TIMESTAMP, paginatedQueryJsonParameters.getPageExpirationTimestamp());
- context.getStateManager().setState(newStateMap, Scope.LOCAL);
+ newStateMap.put(STATE_HIT_COUNT, Integer.toString(paginatedJsonQueryParameters.getHitCount()));
+ newStateMap.put(STATE_PAGE_COUNT, Integer.toString(paginatedJsonQueryParameters.getPageCount()));
+ newStateMap.put(STATE_PAGE_EXPIRATION_TIMESTAMP, paginatedJsonQueryParameters.getPageExpirationTimestamp());
}
+ updateProcessorState(context, newStateMap);
+ }
+
+ void additionalState(final Map newStateMap, final PaginatedJsonQueryParameters paginatedJsonQueryParameters) {
+ // intentionally blank, allows ConsumeElasticsearch to track range value between sessions
}
@Override
@@ -168,10 +183,13 @@ public class SearchElasticsearch extends AbstractPaginatedJsonQueryElasticsearch
final SearchResponse response) throws IOException {
final boolean expiredQuery = StringUtils.isNotEmpty(paginatedJsonQueryParameters.getPageExpirationTimestamp())
&& Instant.ofEpochMilli(Long.parseLong(paginatedJsonQueryParameters.getPageExpirationTimestamp())).isBefore(Instant.now());
+
if (expiredQuery) {
getLogger().debug("Existing paginated query has expired, resetting for new query");
- resetProcessorState(context);
+ final Map newStateMap = new HashMap<>(1, 1);
+ additionalState(newStateMap, paginatedJsonQueryParameters);
+ updateProcessorState(context, newStateMap);
paginatedJsonQueryParameters.setPageCount(0);
paginatedJsonQueryParameters.setHitCount(0);
@@ -186,20 +204,24 @@ public class SearchElasticsearch extends AbstractPaginatedJsonQueryElasticsearch
@Override
String getScrollId(final ProcessContext context, final SearchResponse response) throws IOException {
return response == null || StringUtils.isBlank(response.getScrollId())
- ? context.getStateManager().getState(Scope.LOCAL).get(STATE_SCROLL_ID)
+ ? context.getStateManager().getState(getStateScope()).get(STATE_SCROLL_ID)
: response.getScrollId();
}
@Override
String getPitId(final ProcessContext context, final SearchResponse response) throws IOException {
return response == null || StringUtils.isBlank(response.getScrollId())
- ? context.getStateManager().getState(Scope.LOCAL).get(STATE_PIT_ID)
+ ? context.getStateManager().getState(getStateScope()).get(STATE_PIT_ID)
: response.getPitId();
}
- private void resetProcessorState(final ProcessContext context) throws IOException {
+ void updateProcessorState(final ProcessContext context, final Map newStateMap) throws IOException {
// using ProcessContext#stateManager instead of ProcessSession#*State methods because the latter don't
// seem to persist things properly between sessions if the processor is scheduled to run very quickly, e.g. every second (NIFI-9050)
- context.getStateManager().clear(Scope.LOCAL);
+ if (newStateMap == null || newStateMap.isEmpty()) {
+ context.getStateManager().clear(getStateScope());
+ } else {
+ context.getStateManager().setState(newStateMap, getStateScope());
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginatedJsonQueryParameters.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginatedJsonQueryParameters.java
index 8e97f21b65..68c34fae0f 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginatedJsonQueryParameters.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/PaginatedJsonQueryParameters.java
@@ -24,6 +24,7 @@ public class PaginatedJsonQueryParameters extends JsonQueryParameters {
private String pitId = null;
private String pageExpirationTimestamp = null;
private String keepAlive;
+ private String trackingRangeValue;
public int getPageCount() {
return pageCount;
@@ -76,4 +77,12 @@ public class PaginatedJsonQueryParameters extends JsonQueryParameters {
public void setKeepAlive(final String keepAlive) {
this.keepAlive = keepAlive;
}
+
+ public String getTrackingRangeValue() {
+ return trackingRangeValue;
+ }
+
+ public void setTrackingRangeValue(String trackingRangeValue) {
+ this.trackingRangeValue = trackingRangeValue;
+ }
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/QueryDefinitionType.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/QueryDefinitionType.java
new file mode 100644
index 0000000000..e4a3557912
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/api/QueryDefinitionType.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch.api;
+
+import org.apache.nifi.components.DescribedValue;
+
+import java.util.Arrays;
+
+public enum QueryDefinitionType implements DescribedValue {
+ FULL_QUERY("full", "Provide the full Query."),
+ BUILD_QUERY("build", "Build the Query from separate JSON objects.");
+
+ private final String value;
+ private final String description;
+
+ QueryDefinitionType(final String value, final String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return name();
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ public static QueryDefinitionType fromValue(final String value) {
+ return Arrays.stream(QueryDefinitionType.values()).filter(v -> v.getValue().equals(value)).findFirst()
+ .orElseThrow(() -> new IllegalArgumentException(String.format("Unknown value %s", value)));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index a22d16aae2..bcdd3ab459 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -17,8 +17,8 @@ org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch
org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch
org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch
org.apache.nifi.processors.elasticsearch.SearchElasticsearch
+org.apache.nifi.processors.elasticsearch.ConsumeElasticsearch
org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord
org.apache.nifi.processors.elasticsearch.PutElasticsearchJson
org.apache.nifi.processors.elasticsearch.UpdateByQueryElasticsearch
org.apache.nifi.processors.elasticsearch.GetElasticsearch
-
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.ConsumeElasticsearch/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.ConsumeElasticsearch/additionalDetails.html
new file mode 100644
index 0000000000..23ed422bfa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.ConsumeElasticsearch/additionalDetails.html
@@ -0,0 +1,79 @@
+
+
+
+
+
+ SearchElasticsearch
+
+
+
+ This processor is intended for use with the Elasticsearch JSON DSL and Elasticsearch 5.X and newer. It is designed
+ to be able to create a JSON query using input properties and execute it against an Elasticsearch cluster in a paginated manner.
+ Like all processors in the "restapi" bundle, it uses the official Elastic client APIs, so it supports leader detection.
+ The query is paginated in Elasticsearch using one of the available methods - "Scroll" or "Search After" (optionally
+ with a "Point in Time" for Elasticsearch 7.10+ with XPack enabled). The number of results per page can be controlled using
+ the size property.
+ Results will be sorted on the field that is to be tracked, with the sort order set as a property.
+ Search results and aggregation results can be split up into multiple flowfiles. Aggregation results
+ will only be split at the top level because nested aggregations lose their context (and thus lose their value) if
+ separated from their parent aggregation. Additionally, the results from all pages can be combined into a single
+ flowfile (but the processor will only load each page of data into memory at any one time).
+ The following is an example query that would be created for tracking an "@timestamp" field:
+
+ {
+ "query": {
+ "size": 10000,
+ "sort": {"@timestamp": "desc"},
+ "bool": {
+ "filter": [
+ {"range": {"@timestamp": {"gt": "2023-09-01"}}}
+ ]
+ }
+ }
+ }
+
+ Additional "filter" entries can be added as a JSON string in the query filter property, for example:
+
+ [
+ {"term": {"department": "accounts"}},
+ {"term": {"title.keyword": "Annual Report"}}
+ ]
+
+
+ Query Pagination Across Processor Executions
+ This processor runs on a schedule in order to execute the same query repeatedly. Once a paginated query has been
+ initiated within Elasticsearch, this processor will continue to retrieve results for that same query until no
+ further results are available. After that point, a new paginated query will be initiated using the same Query JSON,
+ but with the "range" filter query's "gt" field set to the last value obtained from previous results.
+ If the results are "Combined" from this processor, then the paginated query will run continually within a
+ single invocation until no more results are available (then the processor will start a new paginated query upon its
+ next invocation). If the results are "Split" or "Per Page", then each invocation of this processor will retrieve the
+ next page of results until either there are no more results or the paginated query expires within Elasticsearch.
+
+ Resetting Queries / Clearing Processor State
+ Cluster State is used to track the progress of a paginated query within this processor. If there is need to restart
+ the query completely or change the processor configuration after a paginated query has already been started,
+ be sure to "Clear State" of the processor once it has been stopped and before restarting.
+ Note that clearing the processor's state will lose details of where the processor was up to with tracking documents retrieved.
+ Update the "Initial Value" with the appropriate value before restarting the processor to continue with where it was up to.
+
+ Duplicate Results
+ This processor does not attempt to de-duplicate results between queries, for example if the same query runs twice
+ and (some or all of) the results are identical, the output will contain these same results for both invocations.
+ This might happen if the NiFi Primary Node changes while a page of data is being retrieved, or if the processor
+ state is cleared, then the processor is restarted.
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
index 127632590b..944a4640ed 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
@@ -27,7 +27,7 @@
If the Query Attribute property is configured, the executed query JSON will be placed in the attribute provided by this property.
The query is paginated in Elasticsearch using one of the available methods - "Scroll" or "Search After" (optionally
with a "Point in Time" for Elasticsearch 7.10+ with XPack enabled). The number of results per page can be controlled using
- the size parameter in the Query JSON. For Search After functionality, a sort parameter must
+ the size parameter in the Query JSON. For Search After functionality, a sort parameter must
be present within the Query JSON.
Search results and aggregation results can be split up into multiple flowfiles. Aggregation results
will only be split at the top level because nested aggregations lose their context (and thus lose their value) if
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.SearchElasticsearch/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.SearchElasticsearch/additionalDetails.html
index d4a0530489..aeb32b5506 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.SearchElasticsearch/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.SearchElasticsearch/additionalDetails.html
@@ -60,17 +60,27 @@
}
}
+
+
Query Pagination Across Processor Executions
This processor runs on a schedule in order to execute the same query repeatedly. Once a paginated query has been
initiated within Elasticsearch, this processor will continue to retrieve results for that same query until no
- further results are available. After that point, a new paginated query will be initiated using the same Query JSON.
- This processor does not attempt to de-duplicate results between queries, for example if the same query runs twice
- and (some or all of) the results are identical, the output will contain these same results for both invocations.
+ further results are available. After that point, a new paginated query will be initiated using the same Query JSON.
If the results are "Combined" from this processor, then the paginated query will run continually within a
single invocation until no more results are available (then the processor will start a new paginated query upon its
next invocation). If the results are "Split" or "Per Page", then each invocation of this processor will retrieve the
next page of results until either there are no more results or the paginated query expires within Elasticsearch.
+
+ Resetting Queries / Clearing Processor State
Local State is used to track the progress of a paginated query within this processor. If there is need to restart
the query completely or change the processor configuration after a paginated query has already been started,
be sure to "Clear State" of the processor once it has been stopped and before restarting.
+
+ Duplicate Results
+ This processor does not attempt to de-duplicate results between queries, for example if the same query runs twice
+ and (some or all of) the results are identical, the output will contain these same results for both invocations.
+ This might happen if the NiFi Primary Node changes while a page of data is being retrieved, or if the processor
+ state is cleared, then the processor is restarted.
+ This processor will continually run the same query unless the processor properties are updated, so unless
+ the data in Elasticsearch has changed, the same data will be retrieved multiple times.s