NIFI-10844 Allow _source only output for GetElasticsearch and JsonQueryElasticsearch

This closes #6687

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Chris Sampson 2022-11-19 20:51:24 +00:00 committed by exceptionfactory
parent c11092b2b4
commit d9420afb60
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
18 changed files with 548 additions and 192 deletions

View File

@ -604,7 +604,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
parseResponseWarningHeaders(response);
return (Map<String, Object>) mapper.readValue(body, Map.class).get("_source");
return (Map<String, Object>) mapper.readValue(body, Map.class).getOrDefault("_source", Collections.emptyMap());
} catch (final Exception ex) {
throw new ElasticsearchException(ex);
}

View File

@ -55,6 +55,7 @@ import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -205,63 +206,29 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
@Test
void testBasicSearch() throws Exception {
final Map<String, Object> temp = new MapBuilder()
.of("size", 10, "query", new MapBuilder().of("match_all", new HashMap<>()).build(),
"aggs", new MapBuilder()
.of("term_counts", new MapBuilder()
.of("terms", new MapBuilder()
.of("field", "msg", "size", 5)
.build())
.build())
.build())
.build();
final String query = prettyJson(temp);
final SearchResponse response = service.search(query, INDEX, type, null);
assertNotNull(response, "Response was null");
assertEquals(15, response.getNumberOfHits(), "Wrong count");
assertFalse(response.isTimedOut(), "Timed out");
assertNotNull(response.getHits(), "Hits was null");
assertEquals(10, response.getHits().size(), "Wrong number of hits");
assertNotNull(response.getAggregations(), "Aggregations are missing");
assertEquals(1, response.getAggregations().size(), "Aggregation count is wrong");
assertNull(response.getScrollId(), "Unexpected ScrollId");
assertNull(response.getSearchAfter(), "Unexpected Search_After");
assertNull(response.getPitId(), "Unexpected pitId");
@SuppressWarnings("unchecked") final Map<String, Object> termCounts = (Map<String, Object>) response.getAggregations().get("term_counts");
assertNotNull(termCounts, "Term counts was missing");
@SuppressWarnings("unchecked") final List<Map<String, Object>> buckets = (List<Map<String, Object>>) termCounts.get("buckets");
assertNotNull(buckets, "Buckets branch was empty");
final Map<String, Object> expected = new MapBuilder()
.of("one", 1, "two", 2, "three", 3,
"four", 4, "five", 5)
.build();
buckets.forEach( aggRes -> {
final String key = (String) aggRes.get("key");
final Integer docCount = (Integer) aggRes.get("doc_count");
assertEquals(expected.get(key), docCount, "${key} did not match.");
});
assertBasicSearch(null);
}
@Test
void testBasicSearchRequestParameters() throws Exception {
assertBasicSearch(createParameters("preference", "_local"));
}
private void assertBasicSearch(final Map<String, String> requestParameters) throws JsonProcessingException {
final Map<String, Object> temp = new MapBuilder()
.of("size", 10, "query", new MapBuilder().of("match_all", new HashMap<>()).build(),
"aggs", new MapBuilder()
.of("term_counts", new MapBuilder()
.of("terms", new MapBuilder()
.of("field", "msg", "size", 5)
.of("term_counts", new MapBuilder()
.of("terms", new MapBuilder()
.of("field", "msg", "size", 5)
.build())
.build())
.build())
.build())
.build();
final String query = prettyJson(temp);
final SearchResponse response = service.search(query, "messages", type, createParameters("preference", "_local"));
final SearchResponse response = service.search(query, "messages", type, requestParameters);
assertNotNull(response, "Response was null");
assertEquals(15, response.getNumberOfHits(), "Wrong count");
@ -287,6 +254,47 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
});
}
@SuppressWarnings("unchecked")
@Test
void testSearchEmptySource() throws Exception {
final Map<String, Object> temp = new MapBuilder()
.of("size", 2,
"query", new MapBuilder().of("match_all", new HashMap<>()).build())
.build();
final String query = prettyJson(temp);
final SearchResponse response = service.search(query, "messages", type, createParameters("_source", "not_exists"));
assertNotNull(response, "Response was null");
assertNotNull(response.getHits(), "Hits was null");
assertEquals(2, response.getHits().size(), "Wrong number of hits");
response.getHits().forEach(h -> {
assertInstanceOf(Map.class, h.get("_source"), "Source not a Map");
assertTrue(((Map<String, Object>)h.get("_source")).isEmpty(), "Source not empty");
});
}
@Test
void testSearchNoSource() throws Exception {
final Map<String, Object> temp = new MapBuilder()
.of("size", 1,
"query", new MapBuilder().of("match_all", new HashMap<>()).build())
.build();
final String query = prettyJson(temp);
final SearchResponse response = service.search(query, "no_source", type, null);
assertNotNull(response, "Response was null");
assertNotNull(response.getHits(), "Hits was null");
assertEquals(1, response.getHits().size(), "Wrong number of hits");
response.getHits().forEach(h -> {
assertFalse(h.isEmpty(), "Hit was empty");
assertFalse(h.containsKey("_source"), "Hit contained _source");
});
}
@Test
void testV6SearchWarnings() throws JsonProcessingException {
assumeTrue(getElasticMajorVersion() == 6, "Requires Elasticsearch 6");
@ -590,6 +598,19 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
}
}
@Test
void testGetEmptySource() {
final Map<String, Object> doc = service.get(INDEX, type, "1", Collections.singletonMap("_source", "not_exist"));
assertNotNull(doc, "Doc was null");
assertTrue(doc.isEmpty(), "Doc was not empty");
}
@Test
void testGetNoSource() {
final Map<String, Object> doc = service.get("no_source", type, "1", null);
assertNotNull(doc, "Doc was null");
assertTrue(doc.isEmpty(), "Doc was not empty");
}
@Test
void testGetNotFound() {
final ElasticsearchException ee = assertThrows(ElasticsearchException.class, () -> service.get(INDEX, type, "not_found", null));

View File

@ -22,6 +22,7 @@ PUT:bulk_a/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:bulk_b/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:bulk_c/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:error_handler:{ "mappings": { "_doc": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}}
PUT:no_source/:{ "mappings":{"_doc":{ "_source": { "enabled": false }, "properties":{ "msg":{"type":"keyword"}}}}}
#add document
PUT:messages/_doc/1:{ "msg":"one" }
@ -45,4 +46,4 @@ PUT:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-432
PUT:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
PUT:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
PUT:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
PUT:no_source/_doc/1:{ "msg":"none" }

View File

@ -22,6 +22,7 @@ PUT:bulk_a/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:bulk_b/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:bulk_c/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:error_handler:{ "mappings": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}
PUT:no_source/:{ "mappings":{ "_source": { "enabled": false }, "properties":{ "msg":{"type":"keyword"}}}}
#add document
POST:messages/_doc/1:{ "msg":"one" }
@ -45,4 +46,4 @@ POST:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-43
POST:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
POST:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
POST:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
PUT:no_source/_doc/1:{ "msg":"none" }

View File

@ -22,6 +22,7 @@ PUT:bulk_a/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:bulk_b/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:bulk_c/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:error_handler:{ "mappings": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}
PUT:no_source/:{ "mappings":{ "_source": { "enabled": false }, "properties":{ "msg":{"type":"keyword"}}}}
#add document
POST:messages/_doc/1:{ "msg":"one" }
@ -45,4 +46,4 @@ POST:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-43
POST:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
POST:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
POST:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
PUT:no_source/_doc/1:{ "msg":"none" }

View File

@ -19,7 +19,6 @@ package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticsearchException;
@ -32,7 +31,10 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.api.AggregationResultsFormat;
import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy;
import org.apache.nifi.processors.elasticsearch.api.SearchResultsFormat;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
@ -41,11 +43,13 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParameters> extends AbstractProcessor implements ElasticsearchRestProcessor {
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
@ -57,23 +61,22 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
.description("Aggregations are routed to this relationship.")
.build();
public static final AllowableValue FLOWFILE_PER_HIT = new AllowableValue(
"splitUp-yes",
"Per Hit",
"Flowfile per hit."
);
public static final AllowableValue FLOWFILE_PER_RESPONSE = new AllowableValue(
"splitUp-no",
"Per Response",
"Flowfile per response."
);
public static final PropertyDescriptor SEARCH_RESULTS_SPLIT = new PropertyDescriptor.Builder()
.name("el-rest-split-up-hits")
.displayName("Search Results Split")
.description("Output a flowfile containing all hits or one flowfile for each individual hit.")
.allowableValues(FLOWFILE_PER_RESPONSE, FLOWFILE_PER_HIT)
.defaultValue(FLOWFILE_PER_RESPONSE.getValue())
.allowableValues(ResultOutputStrategy.PER_RESPONSE.getValue(), ResultOutputStrategy.PER_HIT.getValue())
.defaultValue(ResultOutputStrategy.PER_RESPONSE.getValue())
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor SEARCH_RESULTS_FORMAT = new PropertyDescriptor.Builder()
.name("el-rest-format-hits")
.displayName("Search Results Format")
.description("Format of Hits output.")
.allowableValues(SearchResultsFormat.class)
.defaultValue(SearchResultsFormat.FULL.getValue())
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
@ -81,8 +84,18 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
.name("el-rest-split-up-aggregations")
.displayName("Aggregation Results Split")
.description("Output a flowfile containing all aggregations or one flowfile for each individual aggregation.")
.allowableValues(FLOWFILE_PER_RESPONSE, FLOWFILE_PER_HIT)
.defaultValue(FLOWFILE_PER_RESPONSE.getValue())
.allowableValues(ResultOutputStrategy.PER_RESPONSE.getValue(), ResultOutputStrategy.PER_HIT.getValue())
.defaultValue(ResultOutputStrategy.PER_RESPONSE.getValue())
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor AGGREGATION_RESULTS_FORMAT = new PropertyDescriptor.Builder()
.name("el-rest-format-aggregations")
.displayName("Aggregation Results Format")
.description("Format of Aggregation output.")
.allowableValues(AggregationResultsFormat.class)
.defaultValue(AggregationResultsFormat.FULL.getValue())
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
@ -101,14 +114,12 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
String splitUpHits;
private String splitUpAggregations;
ResultOutputStrategy hitStrategy;
private SearchResultsFormat hitFormat;
private ResultOutputStrategy aggregationStrategy;
private AggregationResultsFormat aggregationFormat;
private boolean outputNoHits;
boolean getOutputNoHits() {
return outputNoHits;
}
final ObjectMapper mapper = new ObjectMapper();
final AtomicReference<ElasticSearchClientService> clientService = new AtomicReference<>(null);
@ -128,7 +139,9 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
descriptors.add(TYPE);
descriptors.add(CLIENT_SERVICE);
descriptors.add(SEARCH_RESULTS_SPLIT);
descriptors.add(SEARCH_RESULTS_FORMAT);
descriptors.add(AGGREGATION_RESULTS_SPLIT);
descriptors.add(AGGREGATION_RESULTS_FORMAT);
descriptors.add(OUTPUT_NO_HITS);
propertyDescriptors = Collections.unmodifiableList(descriptors);
@ -160,12 +173,18 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
return false;
}
boolean isOutputNoHits() {
return outputNoHits;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
clientService.set(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
splitUpHits = context.getProperty(SEARCH_RESULTS_SPLIT).getValue();
splitUpAggregations = context.getProperty(AGGREGATION_RESULTS_SPLIT).getValue();
hitStrategy = ResultOutputStrategy.fromValue(context.getProperty(SEARCH_RESULTS_SPLIT).getValue());
hitFormat = SearchResultsFormat.valueOf(context.getProperty(SEARCH_RESULTS_FORMAT).getValue());
aggregationStrategy = ResultOutputStrategy.fromValue(context.getProperty(AGGREGATION_RESULTS_SPLIT).getValue());
aggregationFormat = AggregationResultsFormat.valueOf(context.getProperty(AGGREGATION_RESULTS_FORMAT).getValue());
outputNoHits = context.getProperty(OUTPUT_NO_HITS).asBoolean();
}
@ -203,7 +222,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
input = session.putAttribute(input, "elasticsearch.query.error", ese.getMessage());
session.transfer(input, ese.isElastic() ? REL_RETRY : REL_FAILURE);
}
} catch (Exception ex) {
} catch (final Exception ex) {
getLogger().error("Could not query documents.", ex);
if (input != null) {
input = session.putAttribute(input, "elasticsearch.query.error", ex.getMessage());
@ -260,17 +279,19 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
final FlowFile parent, final Map<String, String> attributes,
final String transitUri, final StopWatch stopWatch) throws IOException {
if (aggregations != null && !aggregations.isEmpty()) {
final Map<String, Object> formattedAggregations = formatAggregations(aggregations);
final List<FlowFile> aggsFlowFiles = new ArrayList<>();
if (splitUpAggregations.equals(FLOWFILE_PER_HIT.getValue())) {
if (aggregationStrategy == ResultOutputStrategy.PER_HIT) {
int aggCount = 0;
for (final Map.Entry<String, Object> agg : aggregations.entrySet()) {
for (final Map.Entry<String, Object> agg : formattedAggregations.entrySet()) {
final FlowFile aggFlowFile = createChildFlowFile(session, parent);
final String aggJson = mapper.writeValueAsString(agg.getValue());
aggsFlowFiles.add(writeAggregationFlowFileContents(agg.getKey(), ++aggCount, aggJson, session, aggFlowFile, attributes));
}
} else {
final FlowFile aggFlowFile = createChildFlowFile(session, parent);
final String json = mapper.writeValueAsString(aggregations);
final String json = mapper.writeValueAsString(formattedAggregations);
aggsFlowFiles.add(writeAggregationFlowFileContents(null, null, json, session, aggFlowFile, attributes));
}
@ -281,8 +302,29 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
}
}
@SuppressWarnings("unchecked")
private Map<String, Object> formatAggregations(final Map<String, Object> aggregations) {
final Map<String, Object> formattedAggregations;
if (aggregationFormat == AggregationResultsFormat.METADATA_ONLY) {
formattedAggregations = new LinkedHashMap<>(aggregations);
formattedAggregations.forEach((k, v) -> ((Map<String, Object>)v).remove("buckets"));
} else if (aggregationFormat == AggregationResultsFormat.BUCKETS_ONLY) {
formattedAggregations = aggregations.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
e -> ((Map<String, Object>)e.getValue()).get("buckets"),
(k1, k2) -> k1,
LinkedHashMap::new
));
} else {
formattedAggregations = aggregations;
}
return formattedAggregations;
}
FlowFile writeHitFlowFile(final int count, final String json, final ProcessSession session,
final FlowFile hitFlowFile, final Map<String, String> attributes) {
final FlowFile hitFlowFile, final Map<String, String> attributes) {
final FlowFile ff = session.write(hitFlowFile, out -> out.write(json.getBytes()));
attributes.put("hit.count", Integer.toString(count));
@ -301,16 +343,18 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
final FlowFile parent, final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles,
final String transitUri, final StopWatch stopWatch) throws IOException {
if (hits != null && !hits.isEmpty()) {
if (FLOWFILE_PER_HIT.getValue().equals(splitUpHits)) {
for (final Map<String, Object> hit : hits) {
final List<Map<String, Object>> formattedHits = formatHits(hits);
if (hitStrategy == ResultOutputStrategy.PER_HIT) {
for (final Map<String, Object> hit : formattedHits) {
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
final String json = mapper.writeValueAsString(hit);
hitsFlowFiles.add(writeHitFlowFile(1, json, session, hitFlowFile, attributes));
}
} else {
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
final String json = mapper.writeValueAsString(hits);
hitsFlowFiles.add(writeHitFlowFile(hits.size(), json, session, hitFlowFile, attributes));
final String json = mapper.writeValueAsString(formattedHits);
hitsFlowFiles.add(writeHitFlowFile(formattedHits.size(), json, session, hitFlowFile, attributes));
}
} else if (newQuery && outputNoHits) {
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
@ -322,6 +366,24 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
return hitsFlowFiles;
}
@SuppressWarnings("unchecked")
private List<Map<String, Object>> formatHits(final List<Map<String, Object>> hits) {
final List<Map<String, Object>> formattedHits;
if (hitFormat == SearchResultsFormat.METADATA_ONLY) {
formattedHits = hits.stream().map(HashMap::new).collect(Collectors.toList());
formattedHits.forEach(h -> h.remove("_source"));
} else if (hitFormat == SearchResultsFormat.SOURCE_ONLY) {
formattedHits = hits.stream()
.map(h -> (Map<String, Object>) h.getOrDefault("_source", Collections.emptyMap()))
.collect(Collectors.toList());
} else {
formattedHits = hits;
}
return formattedHits;
}
private void transferResultFlowFiles(final ProcessSession session, final List<FlowFile> hitsFlowFiles, final String transitUri,
final StopWatch stopWatch) {
// output any results

View File

@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
@ -30,6 +29,8 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
import org.apache.nifi.processors.elasticsearch.api.PaginationType;
import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
@ -43,45 +44,20 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJsonQueryElasticsearch<PaginatedJsonQueryParameters> {
public static final AllowableValue FLOWFILE_PER_QUERY = new AllowableValue(
"splitUp-query",
"Per Query",
"Combine results from all query responses (one flowfile per entire paginated result set of hits). " +
"Note that aggregations cannot be paged, they are generated across the entire result set and " +
"returned as part of the first page. Results are output with one JSON object per line " +
"(allowing hits to be combined from multiple pages without loading all results into memory)."
);
public static final PropertyDescriptor SEARCH_RESULTS_SPLIT = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT)
.description("Output a flowfile containing all hits or one flowfile for each individual hit " +
"or one flowfile containing all hits from all paged responses.")
.allowableValues(FLOWFILE_PER_RESPONSE, FLOWFILE_PER_HIT, FLOWFILE_PER_QUERY)
.allowableValues(ResultOutputStrategy.class)
.build();
public static final AllowableValue PAGINATION_SEARCH_AFTER = new AllowableValue(
"pagination-search_after",
"Search After",
"Use Elasticsearch \"search_after\" to page sorted results."
);
public static final AllowableValue PAGINATION_POINT_IN_TIME = new AllowableValue(
"pagination-pit",
"Point in Time",
"Use Elasticsearch (7.10+ with XPack) \"point in time\" to page sorted results."
);
public static final AllowableValue PAGINATION_SCROLL = new AllowableValue(
"pagination-scroll",
"Scroll",
"Use Elasticsearch \"scroll\" to page results."
);
public static final PropertyDescriptor PAGINATION_TYPE = new PropertyDescriptor.Builder()
.name("el-rest-pagination-type")
.displayName("Pagination Type")
.description("Pagination method to use. Not all types are available for all Elasticsearch versions, " +
"check the Elasticsearch docs to confirm which are applicable and recommended for your service.")
.allowableValues(PAGINATION_SCROLL, PAGINATION_SEARCH_AFTER, PAGINATION_POINT_IN_TIME)
.defaultValue(PAGINATION_SCROLL.getValue())
.allowableValues(PaginationType.class)
.defaultValue(PaginationType.SCROLL.getValue())
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
@ -106,7 +82,9 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
descriptors.add(TYPE);
descriptors.add(CLIENT_SERVICE);
descriptors.add(SEARCH_RESULTS_SPLIT);
descriptors.add(SEARCH_RESULTS_FORMAT);
descriptors.add(AGGREGATION_RESULTS_SPLIT);
descriptors.add(AGGREGATION_RESULTS_FORMAT);
descriptors.add(PAGINATION_TYPE);
descriptors.add(PAGINATION_KEEP_ALIVE);
descriptors.add(OUTPUT_NO_HITS);
@ -117,14 +95,14 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
// output as newline delimited JSON (allows for multiple pages of results to be appended to existing FlowFiles without retaining all hits in memory)
private final ObjectWriter writer = mapper.writer().withRootValueSeparator("\n");
String paginationType;
PaginationType paginationType;
@Override
@OnScheduled
public void onScheduled(final ProcessContext context) {
super.onScheduled(context);
paginationType = context.getProperty(PAGINATION_TYPE).getValue();
paginationType = PaginationType.fromValue(context.getProperty(PAGINATION_TYPE).getValue());
}
@Override
@ -139,18 +117,18 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
// execute query/scroll
final String queryJson = updateQueryJson(newQuery, paginatedJsonQueryParameters);
if (!newQuery && PAGINATION_SCROLL.getValue().equals(paginationType)) {
if (!newQuery && paginationType == PaginationType.SCROLL) {
response = clientService.get().scroll(queryJson);
} else {
final Map<String, String> requestParameters = getUrlQueryParameters(context, input);
if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
if (paginationType == PaginationType.SCROLL) {
requestParameters.put("scroll", paginatedJsonQueryParameters.getKeepAlive());
}
response = clientService.get().search(
queryJson,
// Point in Time uses general /_search API not /index/_search
PAGINATION_POINT_IN_TIME.getValue().equals(paginationType) ? null : paginatedJsonQueryParameters.getIndex(),
paginationType == PaginationType.POINT_IN_TIME ? null : paginatedJsonQueryParameters.getIndex(),
paginatedJsonQueryParameters.getType(),
requestParameters
);
@ -170,7 +148,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
updatePageExpirationTimestamp(paginatedJsonQueryParameters, !response.getHits().isEmpty());
hitsFlowFiles = handleResponse(response, newQuery, paginatedJsonQueryParameters, hitsFlowFiles, session, input, stopWatch);
} while (!response.getHits().isEmpty() && (input != null || FLOWFILE_PER_QUERY.getValue().equals(splitUpHits)));
} while (!response.getHits().isEmpty() && (input != null || hitStrategy == ResultOutputStrategy.PER_QUERY));
if (response.getHits().isEmpty()) {
getLogger().debug("No more results for paginated query, clearing Elasticsearch resources");
@ -199,7 +177,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
private void prepareNextPageQuery(final ObjectNode queryJson, final PaginatedJsonQueryParameters paginatedJsonQueryParameters) throws IOException {
// prepare to get next page of results (depending on pagination type)
if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
if (paginationType == PaginationType.SCROLL) {
// overwrite query JSON with existing Scroll details
queryJson.removeAll().put("scroll_id", paginatedJsonQueryParameters.getScrollId());
if (StringUtils.isNotBlank(paginatedJsonQueryParameters.getKeepAlive())) {
@ -222,13 +200,13 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
if (!newQuery) {
prepareNextPageQuery(queryJson, paginatedJsonQueryParameters);
} else if ((PAGINATION_POINT_IN_TIME.getValue().equals(paginationType) || PAGINATION_SEARCH_AFTER.getValue().equals(paginationType))
} else if ((paginationType == PaginationType.POINT_IN_TIME || paginationType == PaginationType.SEARCH_AFTER)
&& !queryJson.has("sort")) {
// verify query contains a "sort" field if pit/search_after requested
throw new IllegalArgumentException("Query using pit/search_after must contain a \"sort\" field");
}
if (PAGINATION_POINT_IN_TIME.getValue().equals(paginationType)) {
if (paginationType == PaginationType.POINT_IN_TIME) {
// add pit_id to query JSON
final String queryPitId = newQuery
? clientService.get().initialisePointInTime(paginatedJsonQueryParameters.getIndex(), paginatedJsonQueryParameters.getKeepAlive())
@ -273,7 +251,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
hitsFlowFiles.add(writeCombinedHitFlowFile(paginatedJsonQueryParameters.getHitCount() + hits.size(),
hits, session, hitFlowFile, attributes, append));
} else if (getOutputNoHits()) {
} else if (isOutputNoHits()) {
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
hitsFlowFiles.add(writeHitFlowFile(0, "", session, hitFlowFile, attributes));
}
@ -292,7 +270,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
paginatedJsonQueryParameters.incrementPageCount();
attributes.put("page.number", Integer.toString(paginatedJsonQueryParameters.getPageCount()));
if (FLOWFILE_PER_QUERY.getValue().equals(splitUpHits)) {
if (hitStrategy == ResultOutputStrategy.PER_QUERY) {
combineHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles);
// output results if it seems we've combined all available results (i.e. no hits in this page and therefore no more expected)
@ -317,20 +295,20 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
void clearElasticsearchState(final ProcessContext context, final SearchResponse response) {
try {
if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
if (paginationType == PaginationType.SCROLL) {
final String scrollId = getScrollId(context, response);
if (StringUtils.isNotBlank(scrollId)) {
clientService.get().deleteScroll(scrollId);
}
} else if (PAGINATION_POINT_IN_TIME.getValue().equals(paginationType)) {
} else if (paginationType == PaginationType.POINT_IN_TIME) {
final String pitId = getPitId(context, response);
if (StringUtils.isNotBlank(pitId)) {
clientService.get().deletePointInTime(pitId);
}
}
} catch (Exception ex) {
} catch (final Exception ex) {
getLogger().warn("Error while cleaning up Elasticsearch pagination resources, ignoring", ex);
}
}

View File

@ -38,6 +38,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.elasticsearch.api.PaginatedJsonQueryParameters;
import org.apache.nifi.processors.elasticsearch.api.PaginationType;
import org.apache.nifi.util.StringUtils;
import java.io.IOException;
@ -144,12 +145,12 @@ public class SearchElasticsearch extends AbstractPaginatedJsonQueryElasticsearch
getLogger().debug("Updating local state for next execution");
final Map<String, String> newStateMap = new HashMap<>();
if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
if (paginationType == PaginationType.SCROLL) {
newStateMap.put(STATE_SCROLL_ID, response.getScrollId());
} else {
newStateMap.put(STATE_SEARCH_AFTER, response.getSearchAfter());
if (PAGINATION_POINT_IN_TIME.getValue().equals(paginationType)) {
if (paginationType == PaginationType.POINT_IN_TIME) {
newStateMap.put(STATE_PIT_ID, response.getPitId());
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch.api;
import org.apache.nifi.components.DescribedValue;
public enum AggregationResultsFormat implements DescribedValue {
FULL("Contains full Elasticsearch Aggregation, including Buckets and Metadata."),
BUCKETS_ONLY("Bucket Content only."),
METADATA_ONLY("Aggregation Metadata only.");
private final String description;
AggregationResultsFormat(final String description) {
this.description = description;
}
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return name();
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch.api;
import org.apache.nifi.components.DescribedValue;
import java.util.Arrays;
public enum PaginationType implements DescribedValue {
SCROLL("pagination-scroll", "Use Elasticsearch \"scroll\" to page results."),
SEARCH_AFTER("pagination-search_after", "Use Elasticsearch \"search_after\" to page sorted results."),
POINT_IN_TIME("pagination-pit", "Use Elasticsearch (7.10+ with XPack) \"point in time\" to page sorted results.");
private final String value;
private final String description;
PaginationType(final String value, final String description) {
this.value = value;
this.description = description;
}
@Override
public String getValue() {
return value;
}
@Override
public String getDisplayName() {
return name();
}
@Override
public String getDescription() {
return description;
}
public static PaginationType fromValue(final String value) {
return Arrays.stream(PaginationType.values()).filter(v -> v.getValue().equals(value)).findFirst()
.orElseThrow(() -> new IllegalArgumentException(String.format("Unknown value %s", value)));
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch.api;
import org.apache.nifi.components.DescribedValue;
import java.util.Arrays;
public enum ResultOutputStrategy implements DescribedValue {
PER_HIT("splitUp-yes", "Flowfile per hit."),
PER_RESPONSE("splitUp-no", "Flowfile per response."),
PER_QUERY("splitUp-query", "Combine results from all query responses (one flowfile per entire paginated result set of hits). " +
"Note that aggregations cannot be paged, they are generated across the entire result set and " +
"returned as part of the first page. Results are output with one JSON object per line " +
"(allowing hits to be combined from multiple pages without loading all results into memory).");
private final String value;
private final String description;
ResultOutputStrategy(final String value, final String description) {
this.value = value;
this.description = description;
}
@Override
public String getValue() {
return value;
}
@Override
public String getDisplayName() {
return name();
}
@Override
public String getDescription() {
return description;
}
public static ResultOutputStrategy fromValue(final String value) {
return Arrays.stream(ResultOutputStrategy.values()).filter(v -> v.getValue().equals(value)).findFirst()
.orElseThrow(() -> new IllegalArgumentException(String.format("Unknown value %s", value)));
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch.api;
import org.apache.nifi.components.DescribedValue;
public enum SearchResultsFormat implements DescribedValue {
FULL("Contains full Elasticsearch Hit, including Document Source and Metadata."),
SOURCE_ONLY("Document Source only (where present)."),
METADATA_ONLY("Hit Metadata only.");
private final String description;
SearchResultsFormat(final String description) {
this.description = description;
}
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return name();
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -17,8 +17,12 @@
package org.apache.nifi.processors.elasticsearch
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.nifi.components.state.Scope
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.processors.elasticsearch.api.AggregationResultsFormat
import org.apache.nifi.processors.elasticsearch.api.SearchResultsFormat
import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy
import org.apache.nifi.provenance.ProvenanceEventType
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.TestRunner
@ -31,10 +35,15 @@ import static org.hamcrest.CoreMatchers.equalTo
import static org.hamcrest.CoreMatchers.is
import static org.hamcrest.MatcherAssert.assertThat
import static org.junit.jupiter.api.Assertions.assertEquals
import static org.junit.jupiter.api.Assertions.assertFalse
import static org.junit.jupiter.api.Assertions.assertInstanceOf
import static org.junit.jupiter.api.Assertions.assertNotNull
import static org.junit.jupiter.api.Assertions.assertThrows
import static org.junit.jupiter.api.Assertions.assertTrue
abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryElasticsearch> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
static final String INDEX_NAME = "messages"
abstract P getProcessor()
@ -87,8 +96,8 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "not-boolean")
final String expectedAllowedSplitHits = processor instanceof AbstractPaginatedJsonQueryElasticsearch
? [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY].join(", ")
: [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT].join(", ")
? ResultOutputStrategy.values().collect {r -> r.getValue()}.join(", ")
: [ResultOutputStrategy.PER_RESPONSE.getValue(), ResultOutputStrategy.PER_HIT.getValue()].join(", ")
final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run)
assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 8 validation failures:\n" +
@ -106,7 +115,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
AbstractJsonQueryElasticsearch.TYPE.getName(), AbstractJsonQueryElasticsearch.TYPE.getName(),
AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName(),
AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT.getName(), expectedAllowedSplitHits,
AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT.getName(), [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT].join(", "),
AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT.getName(), [ResultOutputStrategy.PER_RESPONSE.getValue(), ResultOutputStrategy.PER_HIT.getValue()].join(", "),
AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS.getName(),
AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName()
)))
@ -114,14 +123,22 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
@Test
void testBasicQuery() throws Exception {
// test hits (no splitting)
// test hits (no splitting) - full hit format
final TestRunner runner = createRunner(false)
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]])))
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.FULL.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
final FlowFile hits = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0)
hits.assertAttributeEquals("hit.count", "10")
assertOutputContent(hits.getContent(), 10, false)
final List<Map<String, Object>> result = OBJECT_MAPPER.readValue(hits.getContent(), List.class)
result.forEach({ hit ->
final Map<String, Object> h = ((Map<String, Object>)hit)
assertFalse(h.isEmpty())
assertTrue(h.containsKey("_source"))
assertTrue(h.containsKey("_index"))
})
assertThat(
runner.getProvenanceEvents().stream().filter({ pe ->
pe.getEventType() == ProvenanceEventType.RECEIVE &&
@ -132,14 +149,46 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
reset(runner)
// test splitting hits
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
// test splitting hits - _source only format
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.SOURCE_ONLY.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 10, 0, 0)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit ->
hit.assertAttributeEquals("hit.count", "1")
assertOutputContent(hit.getContent(), 1, false)
final Map<String, Object> h = OBJECT_MAPPER.readValue(hit.getContent(), Map.class)
assertFalse(h.isEmpty())
assertFalse(h.containsKey("_source"))
assertFalse(h.containsKey("_index"))
// should be the _source content only
assertTrue(h.containsKey("msg"))
assertThat(
runner.getProvenanceEvents().stream().filter({ pe ->
pe.getEventType() == ProvenanceEventType.RECEIVE &&
pe.getAttribute("uuid") == hit.getAttribute("uuid")
}).count(),
is(1L)
)
})
reset(runner)
// test splitting hits - metadata only format
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.METADATA_ONLY.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 10, 0, 0)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
{ hit ->
hit.assertAttributeEquals("hit.count", "1")
assertOutputContent(hit.getContent(), 1, false)
final Map<String, Object> h = OBJECT_MAPPER.readValue(hit.getContent(), Map.class)
assertFalse(h.isEmpty())
assertFalse(h.containsKey("_source"))
assertTrue(h.containsKey("_index"))
assertThat(
runner.getProvenanceEvents().stream().filter({ pe ->
pe.getEventType() == ProvenanceEventType.RECEIVE &&
@ -197,9 +246,10 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
aggs: [ term_agg: [ terms: [ field: "msg" ] ], term_agg2: [ terms: [ field: "msg" ] ] ]
]))
// test aggregations (no splitting)
// test aggregations (no splitting) - full aggregation format
final TestRunner runner = createRunner(true)
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, query)
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 1)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
@ -208,30 +258,57 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
aggregations.assertAttributeNotExists("aggregation.name")
// count == 1 because aggregations is a single Map rather than a List of Maps, even when there are multiple aggs
assertOutputContent(aggregations.getContent(), 1, false)
Map<String, Object> agg = OBJECT_MAPPER.readValue(aggregations.getContent(), Map.class)
// agg Map of 2 Maps (buckets and metadata)
assertThat(agg.size(), is(2))
agg.keySet().forEach({ aggName ->
final Map<String, Object> termAgg = agg.get(aggName) as Map<String, Object>
assertInstanceOf(List.class, termAgg.get("buckets"))
assertTrue(termAgg.containsKey("doc_count_error_upper_bound"))
})
reset(runner)
// test with the query parameter and no incoming connection
// test with the query parameter and no incoming connection - buckets only aggregation format
runner.setIncomingConnection(false)
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.BUCKETS_ONLY.getValue())
runner.run(1, true, true)
testCounts(runner, 0, 1, 0, 1)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).get(0).assertAttributeNotExists("aggregation.number")
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).get(0).assertAttributeNotExists("aggregation.name")
final MockFlowFile singleAgg = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).get(0)
singleAgg.assertAttributeNotExists("aggregation.number")
singleAgg.assertAttributeNotExists("aggregation.name")
agg = OBJECT_MAPPER.readValue(singleAgg.getContent(), Map.class)
// agg Map of 2 Lists (bucket contents only)
assertThat(agg.size(), is(2))
agg.keySet().forEach({ aggName ->
final List<Map<String, Object>> termAgg = agg.get(aggName) as List<Map<String, Object>>
assertThat(termAgg.size(), is(5))
termAgg.forEach({a ->
assertTrue(a.containsKey("key"))
assertTrue(a.containsKey("doc_count"))
})
})
reset(runner)
// test splitting aggregations
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
// test splitting aggregations - metadata only aggregation format
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.METADATA_ONLY.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 2)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
int a = 0
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).forEach(
{ agg ->
agg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2")
agg.assertAttributeEquals("aggregation.number", Integer.toString(++a))
assertOutputContent(agg.getContent(), 1, false)
{ termAgg ->
termAgg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2")
termAgg.assertAttributeEquals("aggregation.number", Integer.toString(++a))
assertOutputContent(termAgg.getContent(), 1, false)
Map<String, Object> aggContent = OBJECT_MAPPER.readValue(termAgg.getContent(), Map.class)
// agg Map (metadata, no buckets)
assertTrue(aggContent.containsKey("doc_count_error_upper_bound"))
assertFalse(aggContent.containsKey("buckets"))
}
)
reset(runner)
@ -248,15 +325,16 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, query)
runner.setProperty(AbstractJsonQueryElasticsearch.INDEX, "\${es.index}")
runner.setProperty(AbstractJsonQueryElasticsearch.TYPE, "\${es.type}")
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 2)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "10")
a = 0
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_AGGREGATIONS).forEach(
{ agg ->
agg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2")
agg.assertAttributeEquals("aggregation.number", Integer.toString(++a))
assertOutputContent(agg.getContent(), 1, false)
{ termAgg ->
termAgg.assertAttributeEquals("aggregation.name", a == 0 ? "term_agg" : "term_agg2")
termAgg.assertAttributeEquals("aggregation.number", Integer.toString(++a))
assertOutputContent(termAgg.getContent(), 1, false)
}
)
}

View File

@ -17,8 +17,11 @@
package org.apache.nifi.processors.elasticsearch
import org.apache.nifi.components.AllowableValue
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.processors.elasticsearch.api.AggregationResultsFormat
import org.apache.nifi.processors.elasticsearch.api.PaginationType
import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy
import org.apache.nifi.processors.elasticsearch.api.SearchResultsFormat
import org.apache.nifi.provenance.ProvenanceEventType
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.TestRunner
@ -46,12 +49,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
"'%s' validated against 'not-enum' is invalid because Given value not found in allowed set '%s'\n" +
"'%s' validated against 'not-a-period' is invalid because Must be of format <duration> <TimeUnit> where <duration> " +
"is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days\n",
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(),
[
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL,
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER,
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME
].join(", "),
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(), PaginationType.values().collect {p -> p.getValue()}.join(", "),
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName(),
AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName()
)))
@ -80,7 +78,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
// paged query hits splitting
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
input = runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 10, 0, 0)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
@ -102,7 +100,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
// paged query hits combined
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
input = runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
hits = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0)
@ -124,7 +122,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setThrowErrorInDelete(true)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SCROLL.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
// still expect "success" output for exception during final clean-up
@ -146,7 +144,9 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setThrowErrorInDelete(true)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.FULL.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL.getValue())
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
// still expect "success" output for exception during final clean-up
@ -168,7 +168,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setThrowErrorInPit(true)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]])))
// expect "failure" output for exception during query setup
@ -189,7 +189,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
void testQuerySortError() {
// test PiT without sort
final TestRunner runner = createRunner(false)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]])))
// expect "failure" output for exception during query setup
@ -208,7 +208,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
// test search_after without sort
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SEARCH_AFTER.getValue())
runOnce(runner)
testCounts(runner, 0, 0, isInput() ? 1 : 0, 0)
assertThat(runner.getLogger().getErrorMessages().stream()
@ -222,27 +222,27 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
// test scroll without sort (should succeed)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SCROLL.getValue())
runMultiple(runner, 2)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
}
@Test
void testScroll() {
testPagination(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
testPagination(PaginationType.SCROLL)
}
@Test
void testPit() {
testPagination(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
testPagination(PaginationType.POINT_IN_TIME)
}
@Test
void testSearchAfter() {
testPagination(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER)
testPagination(PaginationType.SEARCH_AFTER)
}
abstract void testPagination(final AllowableValue paginationType)
abstract void testPagination(final PaginationType paginationType)
private void runMultiple(final TestRunner runner, final int maxIterations) {
if (isInput()) {
@ -278,7 +278,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
service.setMaxPages(0)
// test that an empty flow file is produced for a per query setup
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
@ -288,7 +288,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
reset(runner)
// test that an empty flow file is produced for a per hit setup
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_HIT)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
@ -298,7 +298,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
reset(runner)
// test that an empty flow file is produced for a per response setup
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_RESPONSE.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)

View File

@ -17,7 +17,9 @@
package org.apache.nifi.processors.elasticsearch
import org.apache.nifi.components.AllowableValue
import org.apache.nifi.processors.elasticsearch.api.PaginationType
import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy
import org.apache.nifi.util.TestRunner
import static groovy.json.JsonOutput.prettyPrint
@ -38,12 +40,12 @@ class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElas
return true
}
void testPagination(final AllowableValue paginationType) {
void testPagination(final PaginationType paginationType) {
// test flowfile per page
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setMaxPages(2)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
runOnce(runner)
@ -60,7 +62,7 @@ class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElas
// test hits splitting
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
runOnce(runner)
testCounts(runner, 1, 20, 0, 0)
int count = 0
@ -76,7 +78,7 @@ class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElas
// test hits combined
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
runOnce(runner)
testCounts(runner, 1, 1, 0, 0)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "20")

View File

@ -17,8 +17,10 @@
package org.apache.nifi.processors.elasticsearch
import org.apache.nifi.components.AllowableValue
import org.apache.nifi.components.state.Scope
import org.apache.nifi.processors.elasticsearch.api.PaginationType
import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy
import org.apache.nifi.state.MockStateManager
import org.apache.nifi.util.TestRunner
import org.junit.Test
@ -50,7 +52,7 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
final TestElasticsearchClientService service = getService(runner)
service.setMaxPages(2)
service.setThrowErrorInSearch(false)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SCROLL.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
// initialise search
@ -73,25 +75,25 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
@Test
void testScrollExpiration() {
testPaginationExpiration(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL)
testPaginationExpiration(PaginationType.SCROLL)
}
@Test
void testPitExpiration() {
testPaginationExpiration(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME)
testPaginationExpiration(PaginationType.POINT_IN_TIME)
}
@Test
void testSearchAfterExpiration() {
testPaginationExpiration(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER)
testPaginationExpiration(PaginationType.SEARCH_AFTER)
}
private void testPaginationExpiration(final AllowableValue paginationType) {
private void testPaginationExpiration(final PaginationType paginationType) {
// test flowfile per page
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setMaxPages(2)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE, "1 sec")
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
@ -131,12 +133,12 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
runner.clearTransferState()
}
void testPagination(final AllowableValue paginationType) {
void testPagination(final PaginationType paginationType) {
// test flowfile per page
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setMaxPages(2)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
// first page
@ -163,7 +165,7 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
// test hits splitting
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
// first page
runOnce(runner)
@ -197,7 +199,7 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
// test hits combined
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
// hits are combined from all pages within a single trigger of the processor
runOnce(runner)
testCounts(runner, 0, 1, 0, 0)
@ -211,7 +213,7 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
assertThat(runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty(), is(true))
}
private static void assertState(final MockStateManager stateManager, final AllowableValue paginationType,
private static void assertState(final MockStateManager stateManager, final PaginationType paginationType,
final int hitCount, final int pageCount) {
stateManager.assertStateEquals(SearchElasticsearch.STATE_HIT_COUNT, Integer.toString(hitCount), Scope.LOCAL)
stateManager.assertStateEquals(SearchElasticsearch.STATE_PAGE_COUNT, Integer.toString(pageCount), Scope.LOCAL)
@ -220,17 +222,17 @@ class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTes
assertThat(Long.parseLong(pageExpirationTimestamp) > Instant.now().toEpochMilli(), is(true))
switch (paginationType) {
case AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SCROLL:
case PaginationType.SCROLL:
stateManager.assertStateEquals(SearchElasticsearch.STATE_SCROLL_ID, "scrollId-${pageCount}", Scope.LOCAL)
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID, Scope.LOCAL)
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SEARCH_AFTER, Scope.LOCAL)
break
case AbstractPaginatedJsonQueryElasticsearch.PAGINATION_POINT_IN_TIME:
case PaginationType.POINT_IN_TIME:
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID, Scope.LOCAL)
stateManager.assertStateEquals(SearchElasticsearch.STATE_PIT_ID, "pitId-${pageCount}", Scope.LOCAL)
stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER, "[\"searchAfter-${pageCount}\"]", Scope.LOCAL)
break
case AbstractPaginatedJsonQueryElasticsearch.PAGINATION_SEARCH_AFTER:
case PaginationType.SEARCH_AFTER:
stateManager.assertStateNotSet(SearchElasticsearch.STATE_SCROLL_ID, Scope.LOCAL)
stateManager.assertStateNotSet(SearchElasticsearch.STATE_PIT_ID, Scope.LOCAL)
stateManager.assertStateEquals(SearchElasticsearch.STATE_SEARCH_AFTER, "[\"searchAfter-${pageCount}\"]", Scope.LOCAL)

View File

@ -17,14 +17,14 @@
package org.apache.nifi.elasticsearch;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
public class MapBuilder {
private final Map<String, Object> toBuild;
public MapBuilder() {
toBuild = new HashMap<>();
toBuild = new LinkedHashMap<>();
}
public MapBuilder of(final String key, final Object value) {

View File

@ -87,7 +87,7 @@ language governing permissions and limitations under the License. -->
</activation>
<properties>
<!-- also update the default Elasticsearch version in nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
<elasticsearch_docker_image>8.5.0</elasticsearch_docker_image>
<elasticsearch_docker_image>8.5.3</elasticsearch_docker_image>
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
</properties>
<build>
@ -118,7 +118,7 @@ language governing permissions and limitations under the License. -->
<profile>
<id>elasticsearch7</id>
<properties>
<elasticsearch_docker_image>7.17.7</elasticsearch_docker_image>
<elasticsearch_docker_image>7.17.8</elasticsearch_docker_image>
</properties>
</profile>
</profiles>