From 658f2547d8f3966375a629d28239ce12b9a1e333 Mon Sep 17 00:00:00 2001 From: Ryan Van Den Bos Date: Wed, 12 Apr 2023 10:21:57 +0100 Subject: [PATCH] NIFI-11430 - PaginatedJsonQueryElasticsearch processors should not output empty FlowFile if hits have been found; PaginatedJsonQueryElasticsearch processors should be able to use _source and _meta only result formats when grouping by query This closes #7163 Signed-off-by: Chris Sampson --- .../AbstractJsonQueryElasticsearch.java | 2 +- ...stractPaginatedJsonQueryElasticsearch.java | 9 +- .../AbstractJsonQueryElasticsearchTest.groovy | 39 +++-- ...PaginatedJsonQueryElasticsearchTest.groovy | 150 ++++++++++++++---- ...PaginatedJsonQueryElasticsearchTest.groovy | 98 +++++++----- .../AbstractElasticsearchITBase.java | 2 +- .../nifi-elasticsearch-bundle/pom.xml | 4 +- 7 files changed, 199 insertions(+), 105 deletions(-) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java index 1000779143..f12409b074 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java @@ -367,7 +367,7 @@ public abstract class AbstractJsonQueryElasticsearch> formatHits(final List> hits) { + List> formatHits(final List> hits) { final List> formattedHits; if (hitFormat == SearchResultsFormat.METADATA_ONLY) { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java index 01e5994c47..f26a5a3c52 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java @@ -75,6 +75,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs .build(); static final List paginatedPropertyDescriptors; + static { final List descriptors = new ArrayList<>(); descriptors.add(QUERY_ATTRIBUTE); @@ -239,7 +240,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs private void combineHits(final List> hits, final PaginatedJsonQueryParameters paginatedJsonQueryParameters, final ProcessSession session, final FlowFile parent, - final Map attributes, final List hitsFlowFiles) { + final Map attributes, final List hitsFlowFiles, final boolean newQuery) { if (hits != null && !hits.isEmpty()) { final FlowFile hitFlowFile; final boolean append = !hitsFlowFiles.isEmpty(); @@ -251,7 +252,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs hitsFlowFiles.add(writeCombinedHitFlowFile(paginatedJsonQueryParameters.getHitCount() + hits.size(), hits, session, hitFlowFile, attributes, append)); - } else if (isOutputNoHits()) { + } else if (isOutputNoHits() && newQuery) { final FlowFile hitFlowFile = createChildFlowFile(session, parent); hitsFlowFiles.add(writeHitFlowFile(0, "", session, hitFlowFile, attributes)); } @@ -271,7 +272,9 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs attributes.put("page.number", Integer.toString(paginatedJsonQueryParameters.getPageCount())); if (hitStrategy == ResultOutputStrategy.PER_QUERY) { - combineHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles); + + final List> formattedHits = formatHits(hits); + combineHits(formattedHits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, newQuery); // output results if it seems we've combined all available results (i.e. no hits in this page and therefore no more expected) if (!hitsFlowFiles.isEmpty() && (hits == null || hits.isEmpty())) { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy index 670b9599b0..2eb2b91653 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy @@ -42,9 +42,8 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf import static org.junit.jupiter.api.Assertions.assertNotNull import static org.junit.jupiter.api.Assertions.assertThrows import static org.junit.jupiter.api.Assertions.assertTrue - abstract class AbstractJsonQueryElasticsearchTest

{ - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() static final String INDEX_NAME = "messages" @@ -101,8 +100,9 @@ abstract class AbstractJsonQueryElasticsearchTest

r.getValue()) .collect(Collectors.joining(", ")) final String expectedAllowedSplitHits = processor instanceof AbstractPaginatedJsonQueryElasticsearch - ? ResultOutputStrategy.values().collect {r -> r.getValue()}.join(", ") - : nonPaginatedResultOutputStrategies + ? ResultOutputStrategy.values().collect { r -> r.getValue() }.join(", ") + : ResultOutputStrategy.getNonPaginatedResponseOutputStrategies().stream() + .map(r -> r.getValue()).collect(Collectors.joining(", ")) final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run) assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 8 validation failures:\n" + @@ -130,7 +130,7 @@ abstract class AbstractJsonQueryElasticsearchTest

> result = OBJECT_MAPPER.readValue(hits.getContent(), List.class) result.forEach({ hit -> - final Map h = ((Map)hit) + final Map h = ((Map) hit) assertFalse(h.isEmpty()) assertTrue(h.containsKey("_source")) assertTrue(h.containsKey("_index")) @@ -211,14 +211,13 @@ abstract class AbstractJsonQueryElasticsearchTest

- pe.getEventType() == ProvenanceEventType.RECEIVE && - pe.getAttribute("uuid") == hits.getAttribute("uuid") + pe.getEventType() == ProvenanceEventType.RECEIVE }).count(), is(0L) ) @@ -247,8 +246,8 @@ abstract class AbstractJsonQueryElasticsearchTest

final List> termAgg = agg.get(aggName) as List> assertThat(termAgg.size(), is(5)) - termAgg.forEach({a -> + termAgg.forEach({ a -> assertTrue(a.containsKey("key")) assertTrue(a.containsKey("doc_count")) }) @@ -321,8 +320,8 @@ abstract class AbstractJsonQueryElasticsearchTest

{ abstract boolean isInput() @@ -40,7 +42,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ @Test void testInvalidPaginationProperties() { final TestRunner runner = createRunner(false) - runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]]))) + runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]]]))) runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE, "not-a-period") runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, "not-enum") @@ -49,7 +51,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ "'%s' validated against 'not-enum' is invalid because Given value not found in allowed set '%s'\n" + "'%s' validated against 'not-a-period' is invalid because Must be of format where " + "is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days\n", - AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(), PaginationType.values().collect {p -> p.getValue()}.join(", "), + AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE.getName(), PaginationType.values().collect { p -> p.getValue() }.join(", "), AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName(), AbstractPaginatedJsonQueryElasticsearch.PAGINATION_KEEP_ALIVE.getName() ))) @@ -59,7 +61,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ void testSinglePage() { // paged query hits (no splitting) final TestRunner runner = createRunner(false) - runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]]))) + runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]]]))) MockFlowFile input = runOnce(runner) testCounts(runner, isInput() ? 1 : 0, 1, 0, 0) FlowFile hits = runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0) @@ -117,13 +119,104 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ assertSendEvent(runner, input) } + static void assertFormattedResult(final SearchResultsFormat searchResultsFormat, final Map hit) { + assertFalse(hit.isEmpty()) + switch(searchResultsFormat) { + case SearchResultsFormat.SOURCE_ONLY: + assertFalse(hit.containsKey("_source")) + assertFalse(hit.containsKey("_index")) + break + case SearchResultsFormat.METADATA_ONLY: + assertFalse(hit.containsKey("_source")) + assertTrue(hit.containsKey("_index")) + break + case SearchResultsFormat.FULL: + assertTrue(hit.containsKey("_source")) + assertTrue(hit.containsKey("_index")) + break + default: + throw new IllegalArgumentException("Unknown SearchResultsFormat value: " + searchResultsFormat.toString()) + } + } + + private void assertResultsFormat(final TestRunner runner, final ResultOutputStrategy resultOutputStrategy, final SearchResultsFormat searchResultsFormat) { + int flowFileCount + String hitsCount + boolean ndjson = false + + switch (resultOutputStrategy) { + case ResultOutputStrategy.PER_QUERY: + flowFileCount = 1 + hitsCount = "10" + ndjson = true + break + case ResultOutputStrategy.PER_HIT: + flowFileCount = 10 + hitsCount = "1" + break + case ResultOutputStrategy.PER_RESPONSE: + flowFileCount = 1 + hitsCount = "10" + break + default: + throw new IllegalArgumentException("Unknown ResultOutputStrategy value: " + resultOutputStrategy.toString()) + } + + // Test Relationship counts + testCounts(runner, isInput() ? 1 : 0, flowFileCount, 0, 0) + + // Per response outputs an array of values + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit -> + hit.assertAttributeEquals("hit.count", hitsCount) + assertOutputContent(hit.getContent(), hitsCount as int, ndjson) + if (ResultOutputStrategy.PER_RESPONSE == resultOutputStrategy) { + OBJECT_MAPPER.readValue(hit.getContent(), ArrayList.class).forEach(h -> { + assertFormattedResult(searchResultsFormat, h as Map) + }) + } else { + final Map h = OBJECT_MAPPER.readValue(hit.getContent(), Map.class) + assertFormattedResult(searchResultsFormat, h) + } + assertThat( + runner.getProvenanceEvents().stream().filter({ pe -> + pe.getEventType() == ProvenanceEventType.RECEIVE && + pe.getAttribute("uuid") == hit.getAttribute("uuid") + }).count(), + is(1L) + ) + }) + } + + @Test + void testResultsFormat() throws Exception { + for (final ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) { + final TestRunner runner = createRunner(false) + runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]]))) + runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue()) + + // Test against each results format + for (final SearchResultsFormat searchResultsFormat : SearchResultsFormat.values()) { + runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, searchResultsFormat.getValue()) + + // Test against each pagination type + for (final PaginationType paginationType : PaginationType.values()) { + runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue()) + + runOnce(runner) + assertResultsFormat(runner, resultOutputStrategy, searchResultsFormat) + reset(runner) + } + } + } + } + @Test void testScrollError() { final TestRunner runner = createRunner(false) final TestElasticsearchClientService service = getService(runner) service.setThrowErrorInDelete(true) runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.SCROLL.getValue()) - runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]]))) + runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [msg: "desc"], query: [match_all: [:]]]))) // still expect "success" output for exception during final clean-up runMultiple(runner, 2) @@ -147,7 +240,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, SearchResultsFormat.FULL.getValue()) runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_FORMAT, AggregationResultsFormat.FULL.getValue()) runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME.getValue()) - runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]]))) + runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [msg: "desc"], query: [match_all: [:]]]))) // still expect "success" output for exception during final clean-up runMultiple(runner, 2) @@ -169,7 +262,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ final TestElasticsearchClientService service = getService(runner) service.setThrowErrorInPit(true) runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME.getValue()) - runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [ msg: "desc" ], query: [ match_all: [:] ]]))) + runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([sort: [msg: "desc"], query: [match_all: [:]]]))) // expect "failure" output for exception during query setup runOnce(runner) @@ -190,7 +283,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ // test PiT without sort final TestRunner runner = createRunner(false) runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, PaginationType.POINT_IN_TIME.getValue()) - runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]]))) + runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]]]))) // expect "failure" output for exception during query setup runOnce(runner) @@ -265,46 +358,35 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ is(1L) ) } else { - assertThat(runner.getProvenanceEvents().stream().filter({ pe -> pe.getEventType() == ProvenanceEventType.SEND}).count(), is(0L)) + assertThat(runner.getProvenanceEvents().stream().filter({ pe -> pe.getEventType() == ProvenanceEventType.SEND }).count(), is(0L)) } } @Test - void testNoHitsFlowFileIsProducedForEachResultSplitSetup() { + void testEmptyHitsFlowFileIsProducedForEachResultSplitSetup() { final TestRunner runner = createRunner(false) final TestElasticsearchClientService service = getService(runner) - runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]]]))) + runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]]))) runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.OUTPUT_NO_HITS, "true") service.setMaxPages(0) - // test that an empty flow file is produced for a per query setup - runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue()) - runOnce(runner) - testCounts(runner, isInput() ? 1 : 0, 1, 0, 0) - runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0") - runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1") - runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0 - reset(runner) + for (final PaginationType paginationType : PaginationType.values()) { + runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue()) - // test that an empty flow file is produced for a per hit setup - runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue()) - runOnce(runner) - testCounts(runner, isInput() ? 1 : 0, 1, 0, 0) + for (final ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) { + // test that an empty flow file is produced for a per query setup + runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue()) + runOnce(runner) + testCounts(runner, isInput() ? 1 : 0, 1, 0, 0) - runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0") - runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1") - runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0 - reset(runner) + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0") + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1") + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0 + reset(runner) + } + } - // test that an empty flow file is produced for a per response setup - runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_RESPONSE.getValue()) - runOnce(runner) - testCounts(runner, isInput() ? 1 : 0, 1, 0, 0) - runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0") - runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1") - runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0 - reset(runner) } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy index 992b548b94..30c6eea6a3 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PaginatedJsonQueryElasticsearchTest.groovy @@ -17,7 +17,6 @@ package org.apache.nifi.processors.elasticsearch - import org.apache.nifi.processors.elasticsearch.api.PaginationType import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy import org.apache.nifi.util.TestRunner @@ -40,54 +39,65 @@ class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElas return true } + static void validatePagination(final TestRunner runner, final ResultOutputStrategy resultOutputStrategy) { + switch (resultOutputStrategy) { + case ResultOutputStrategy.PER_RESPONSE: + testCounts(runner, 1, 2, 0, 0) + int page = 1 + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach( + { hit -> + hit.assertAttributeEquals("hit.count", "10") + hit.assertAttributeEquals("page.number", Integer.toString(page++)) + } + ) + break + case ResultOutputStrategy.PER_QUERY: + testCounts(runner, 1, 1, 0, 0) + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "20") + // the "last" page.number is used, so 2 here because there were 2 pages of hits + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "2") + assertThat( + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getContent().split("\n").length, + is(20) + ) + break + case ResultOutputStrategy.PER_HIT: + testCounts(runner, 1, 20, 0, 0) + int count = 0 + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach( + { hit -> + hit.assertAttributeEquals("hit.count", "1") + // 10 hits per page, so first 10 flow files should be page.number 1, the rest page.number 2 + hit.assertAttributeEquals("page.number", Integer.toString(Math.ceil(++count / 10) as int)) + } + ) + break + default: + throw new IllegalArgumentException("Unknown ResultOutputStrategy value: " + resultOutputStrategy) + } + } + void testPagination(final PaginationType paginationType) { - // test flowfile per page final TestRunner runner = createRunner(false) final TestElasticsearchClientService service = getService(runner) service.setMaxPages(2) - runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue()) - runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]]))) + runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [msg: "desc"], query: [match_all: [:]]]))) - runOnce(runner) - testCounts(runner, 1, 2, 0, 0) - int page = 1 - runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach( - { hit -> - hit.assertAttributeEquals("hit.count", "10") - hit.assertAttributeEquals("page.number", Integer.toString(page++)) - } - ) - runner.getStateManager().assertStateNotSet() - reset(runner) + for (final ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) { + runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue()) + runOnce(runner) + validatePagination(runner, resultOutputStrategy) + runner.getStateManager().assertStateNotSet() + reset(runner) - // test hits splitting - runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue()) - runOnce(runner) - testCounts(runner, 1, 20, 0, 0) - int count = 0 - runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach( - { hit -> - hit.assertAttributeEquals("hit.count", "1") - // 10 hits per page, so first 10 flowfiles should be page.number 1, the rest page.number 2 - hit.assertAttributeEquals("page.number", Integer.toString(Math.ceil(++count / 10) as int)) - } - ) - runner.getStateManager().assertStateNotSet() - reset(runner) - - - // test hits combined - runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue()) - runOnce(runner) - testCounts(runner, 1, 1, 0, 0) - runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "20") - // the "last" page.number is used, so 2 here because there were 2 pages of hits - runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "2") - assertThat( - runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getContent().split("\n").length, - is(20) - ) - runner.getStateManager().assertStateNotSet() + // Check that OUTPUT_NO_HITS true doesn't have any adverse effects on pagination + runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "true") + runOnce(runner) + validatePagination(runner, resultOutputStrategy) + // Unset OUTPUT_NO_HITS + runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "false") + reset(runner) + } } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java index 5c2b56ba94..3b23e3c729 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java @@ -51,7 +51,7 @@ import static org.apache.http.auth.AuthScope.ANY; public abstract class AbstractElasticsearchITBase { // default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile protected static final DockerImageName IMAGE = DockerImageName - .parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.7.0")); + .parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.7.1")); protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20)); private static final int PORT = 9200; protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml index 5364ec52a1..571a1fbb3d 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml @@ -101,7 +101,7 @@ language governing permissions and limitations under the License. --> - 8.7.0 + 8.7.1 s3cret @@ -132,7 +132,7 @@ language governing permissions and limitations under the License. --> elasticsearch7 - 7.17.9 + 7.17.10