From 2ad33eea8002db00b5bf80edc7b2dc30cb3d1557 Mon Sep 17 00:00:00 2001 From: Ryan Van Den Bos Date: Tue, 22 Nov 2022 09:41:44 +0000 Subject: [PATCH] NIFI-10845 - JsonQueryElasticsearch processors are not outputting an empty flow file for a combined response with output_no_hits set to true Signed-off-by: Chris Sampson This closes #6701 --- .../AbstractJsonQueryElasticsearch.java | 8 +++- ...stractPaginatedJsonQueryElasticsearch.java | 3 ++ ...PaginatedJsonQueryElasticsearchTest.groovy | 39 +++++++++++++++++++ 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java index 1ce0f7f414..f154cf2790 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java @@ -105,6 +105,10 @@ public abstract class AbstractJsonQueryElasticsearch clientService = new AtomicReference<>(null); @@ -277,8 +281,8 @@ public abstract class AbstractJsonQueryElasticsearch attributes) { + FlowFile writeHitFlowFile(final int count, final String json, final ProcessSession session, + final FlowFile hitFlowFile, final Map attributes) { final FlowFile ff = session.write(hitFlowFile, out -> out.write(json.getBytes())); attributes.put("hit.count", Integer.toString(count)); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java index 2e8eab6eeb..ec1a020ad7 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java @@ -273,6 +273,9 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs hitsFlowFiles.add(writeCombinedHitFlowFile(paginatedJsonQueryParameters.getHitCount() + hits.size(), hits, session, hitFlowFile, attributes, append)); + } else if (getOutputNoHits()) { + final FlowFile hitFlowFile = createChildFlowFile(session, parent); + hitsFlowFiles.add(writeHitFlowFile(0, "", session, hitFlowFile, attributes)); } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy index 274e6a2fd5..0ec4470f76 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearchTest.groovy @@ -268,4 +268,43 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ assertThat(runner.getProvenanceEvents().stream().filter({ pe -> pe.getEventType() == ProvenanceEventType.SEND}).count(), is(0L)) } } + + @Test + void testNoHitsFlowFileIsProducedForEachResultSplitSetup() { + final TestRunner runner = createRunner(false) + final TestElasticsearchClientService service = getService(runner) + runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]]]))) + runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.OUTPUT_NO_HITS, "true") + service.setMaxPages(0) + + // test that an empty flow file is produced for a per query setup + runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY) + runOnce(runner) + testCounts(runner, isInput() ? 1 : 0, 1, 0, 0) + + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0") + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1") + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0 + reset(runner) + + // test that an empty flow file is produced for a per hit setup + runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_HIT) + runOnce(runner) + testCounts(runner, isInput() ? 1 : 0, 1, 0, 0) + + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0") + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1") + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0 + reset(runner) + + // test that an empty flow file is produced for a per response setup + runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE) + runOnce(runner) + testCounts(runner, isInput() ? 1 : 0, 1, 0, 0) + + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0") + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1") + runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0 + reset(runner) + } }